跳到主要内容

SpringBatch 连接 seekdb 示例程序

本文将介绍如何使用 SpringBatch 框架和 seekdb 构建一个应用程序,实现创建表、插入数据和查询数据等基本操作。

点击下载 java-oceanbase-springbatch 示例工程

前提条件

  • 您已安装 seekdb。
  • 您已安装 JDK 1.8 和 Maven。
  • 您已安装 IntelliJ IDEA。
信息

本文档运行代码使用的工具是 IntelliJ IDEA 2021.3.2 (Community Edition) 版本,您也可以根据个人喜好选择适合自己的工具运行示例代码。

操作步骤

信息

本文中给出的操作步骤是基于 Windows 环境生成。如果您使用的是其他操作系统环境或编译器,那么操作步骤可能会略有不同。

  1. 获取 seekdb 连接串。
  2. 导入 java-oceanbase-springbatch 项目到 IDEA 中。
  3. 修改 java-oceanbase-springbatch 项目中的数据库连接信息。
  4. 运行 java-oceanbase-springbatch 项目。

步骤一:获取 seekdb 连接串

  1. 联系 seekdb 部署人员或者管理员获取相应的数据库连接串。

    mysql -hxx.xx.xx.xx -P2881 -uroot -p**** -A
  2. 根据已部署的 seekdb 填写下面 URL 的对应信息。

    信息

    application.properties 文件中需要这里的 URL 信息。

    jdbc:oceanbase://host:port/schema_name?user=$user_name&password=$password&characterEncoding=utf-8

    参数说明:

    • host:提供 seekdb 的连接 IP。应该被实际的 IP 替换,也可以使用本地 IP 及 127.0.0.1。
    • port:提供 seekdb 接端口。应该被实际的端口替换,默认是 2881,在部署 seekdb 时可自定义。
    • schema_name:需要访问的 Schema 名称。
    • user_name:通过 -u 参数指定,格式为 用户。默认用户为 root
    • password:提供账户密码。
    • characterEncoding:提供字符编码。

更多 URL 参数说明信息,请参见 数据库 URL

步骤二:导入 java-oceanbase-springbatch 项目到 IDEA 中

  1. 打开 IntelliJ IDEA,选择 File > Open... 选项。

    file

  2. 在弹出的 Open File or Project 窗口中,选择对应的项目文件,单击 OK 完成项目文件导入。

  3. IntelliJ IDEA 将会自动识别项目中的各类文件,并在 Project 工具窗口中,可以查看项目的目录结构、文件列表、模块列表、依赖关系等信息。Project 工具窗口通常位于 IntelliJ IDEA 界面的最左侧,默认情况下是打开的。如果 Project 工具窗口被关闭了,可以通过点击菜单栏中的 View > Tool Windows > Project 或者使用快捷键 Alt + 1 来重新打开它。

    信息

    当使用 IntelliJ IDEA 导入项目时,IntelliJ IDEA 会自动检测项目中的 pom.xml 文件,并根据文件中描述的依赖关系自动下载所需的依赖库,并将它们添加到项目中。

  4. 查看项目情况。

springbatch

步骤三:修改 java-oceanbase-springbatch 项目中的数据库连接信息

根据 步骤一:获取 seekdb 连接串 中的信息修改 application.properties 文件中的数据库连接信息。

示例如下:

  • 数据库驱动的名称为:com.mysql.cj.jdbc.Driver
  • seekdb 的 IP 地址为 10.10.10.1
  • 访问端口使用的是 2881。
  • 需要访问的 Schema 名称为 test
  • 连接账户是 root
  • 密码是 ******

示例代码如下:

spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:oceanbase://10.10.10.1:2881/test?characterEncoding=utf-8
spring.datasource.username=root
spring.datasource.password=******

spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update

spring.batch.job.enabled=false

logging.level.org.springframework=INFO
logging.level.com.example=DEBUG

步骤四:运行 java-oceanbase-springbatch 项目

  • 运行 AddDescPeopleWriterTest.java 文件。

    1. 在项目结构中找到 src > test > java 中找到 AddDescPeopleWriterTest.java 文件。
    2. 在工具菜单栏中选择 Run > Run... > AddDescPeopleWriterTest.testWrite,或直接单击右上角绿色三角形运行。
    3. 通过 IDEA 的控制台来查看项目的日志信息和输出结果。
    people_desc 表中的数据:
    PeopleDESC [name=John, age=25, desc=This is John with age 25]
    PeopleDESC [name=Alice, age=30, desc=This is Alice with age 30]
    Batch Job execution completed.
  • 运行 AddPeopleWriterTest.java 文件。

    1. 在项目结构中找到 src > test > java 中找到 AddDescPeopleWriterTest.java 文件。
    2. 在工具菜单栏中选择 Run > Run... > AddPeopleWriterTest.testWrite,或直接单击右上角绿色三角形运行。
    3. 通过 IDEA 的控制台来查看项目的日志信息和输出结果。
    people 表中的数据:
    People [name=zhangsan, age=27]
    People [name=lisi, age=35]
    Batch Job execution completed.

常见问题

1. 连接超时

如果遇到连接超时问题,可以在 JDBC URL 中配置连接超时参数:

jdbc:mysql://host:port/database?connectTimeout=30000&socketTimeout=60000

2. 字符集问题

为确保正确的字符编码,在 JDBC URL 中设置正确的字符集参数:

jdbc:mysql://host:port/database?characterEncoding=utf8&useUnicode=true

3. SSL 连接

要启用与 seekdb 的 SSL 连接,在 JDBC URL 中添加以下参数:

jdbc:mysql://host:port/database?useSSL=true&requireSSL=true

4. 账号密码中的特殊字符

如果用户名或密码包含特殊字符(如 #),需要进行 URL 编码:

String encodedPassword = URLEncoder.encode(password, "UTF-8");
提示

使用 MySQL Connector/J 8.x 时,确保账号密码不包含井号(#)。否则,可能会遇到连接错误。

项目代码介绍

点击 java-oceanbase-springbatch 下载项目代码,这是一个名为 java-oceanbase-springbatch 的压缩包。

解压后,得到一个名为 java-oceanbase-springbatch 的文件夹。目录结构如下所示:

│  pom.xml

├─.idea

├─src
│ ├─main
│ │ ├─java
│ │ │ └─com
│ │ │ └─oceanbase
│ │ │ └─example
│ │ │ └─batch
│ │ │ │──BatchApplication.java
│ │ │ │
│ │ │ ├─config
│ │ │ │ └─BatchConfig.java
│ │ │ │
│ │ │ ├─model
│ │ │ │ ├─People.java
│ │ │ │ └─PeopleDESC.java
│ │ │ │
│ │ │ ├─processor
│ │ │ │ └─AddPeopleDescProcessor.java
│ │ │ │
│ │ │ └─writer
│ │ │ ├─AddDescPeopleWriter.java
│ │ │ └─AddPeopleWriter.java
│ │ │
│ │ └─resources
│ │ └─application.properties
│ │
│ └─test
│ └─java
│ └─com
│ └─oceanbase
│ └─example
│ └─batch
│ ├─config
│ │ └─BatchConfigTest.java
│ │
│ ├─processor
│ │ └─AddPeopleDescProcessorTest.java
│ │
│ └─writer
│ ├─AddDescPeopleWriterTest.java
│ └─AddPeopleWriterTest.java

└─target

文件说明:

  • pom.xml:Maven 项目的配置文件,包含了项目的依赖、插件、构建等信息。
  • .idea:IDE(集成开发环境)中使用的目录,用于存储项目相关的配置信息。
  • src:通常用于表示项目中存放源代码的目录。
  • main: 存放主要的源代码和资源文件的目录。
  • java: 存放 Java 源代码的目录。
  • com.oceanbase.example.batch:包名。
  • BatchApplication.java:这是应用程序的入口类,包含了应用程序的主方法。
  • config:这是配置类文件夹,包含了应用程序的配置类。
  • BatchConfig.java:这是应用程序的配置类,用于配置应用程序的一些属性和行为。
  • model:这是模型类文件夹,包含了应用程序的数据模型类。
  • People.java:这是一个人员数据模型类。
  • PeopleDESC.java:这是一个人员DESC数据模型类。
  • processor:这是处理器类文件夹,包含了应用程序的处理器类。
  • AddPeopleDescProcessor.java:这是一个添加人员DESC信息的处理器类。
  • writer:这是写入器类文件夹,包含了应用程序的写入器类。
  • AddDescPeopleWriter.java:这是一个写入人员DESC信息的写入器类。
  • AddPeopleWriter.java:这是一个写入人员信息的写入器类。
  • resources:这是资源文件夹,包含了应用程序的配置文件和其他静态资源文件。
  • application.properties:这是应用程序的配置文件,用于配置应用程序的属性。
  • test: 存放测试代码和资源文件的目录。
  • BatchConfigTest.java:这是应用程序配置类的测试类。
  • AddPeopleDescProcessorTest.java:这是添加人员DESC处理器的测试类。
  • AddDescPeopleWriterTest.java:这是写入人员DESC信息的写入器的测试类。
  • AddPeopleWriterTest.java:这是写入人员信息的写入器的测试类。
  • target: 存放编译后的 Class 文件、Jar 包等文件的目录。

pom.xml 代码介绍

信息

如果您只是想验证示例,那么请使用默认代码,无需修改。您也可以按照以下讲解,根据自己的需求修改 pom.xml 文件。

pom.xml 配置文件内容如下:

  1. 文件声明语句。

    声明本文件是一个 XML 文件,使用的 XML 版本是 1.0,字符编码方式是 UTF-8

    代码如下:

    <?xml version="1.0" encoding="UTF-8"?>
  2. 配置 POM 的命名空间和 POM 模型版本。

    1. 通过 xmlns 指定 POM 的命名空间为 http://maven.apache.org/POM/4.0.0
    2. 通过 xmlns:xsi 指定 XML 命名空间为 http://www.w3.org/2001/XMLSchema-instance
    3. 通过 xsi:schemaLocation 指定 POM 的命名空间为 http://maven.apache.org/POM/4.0.0,并指定 POM 的 XSD 文件的位置为 https://maven.apache.org/xsd/maven-4.0.0.xsd
    4. 通过 <modelVersion> 元素指定该 POM 文件使用的 POM 模型版本为 4.0.0

    代码如下:

     <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    </project>
  3. 配置父项信息。

    1. 通过 <groupId> 指定父项标识为 org.springframework.boot
    2. 通过 <artifactId> 指定父项依赖为 spring-boot-starter-parent
    3. 通过 <version> 指定父项的版本号为 2.7.11
    4. 通过 relativePath 表示父项的路径为空。

    代码如下:

     <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.11</version>
    <relativePath/>
    </parent>
  4. 配置基本信息。

    1. 通过 <groupId> 指定项目标识为 com.oceanbase
    2. 通过 <artifactId> 指定项目依赖为 java-oceanbase-springboot
    3. 通过 <version> 指定项目的版本号为 0.0.1-SNAPSHOT
    4. 通过 description 介绍项目信息为 Demo project for Spring Batch

    代码如下:

     <groupId>com.oceanbase</groupId>
    <artifactId>java-oceanbase-springboot</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>java-oceanbase-springbatch</name>
    <description>Demo project for Spring Batch</description>
  5. 配置 java 版本。

    指定项目使用的 Java 版本为 1.8。

    代码如下:

      <properties>
    <java.version>1.8</java.version>
    </properties>
  6. 配置核心依赖。

    1. 指定依赖项所属的组织为 org.springframework.boot,名称为 spring-boot-starter,通过该依赖可以使用 Spring Boot 默认支持的组件依赖,支持 Web、数据处理、安全、Test 等功能。

      代码如下:

      <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
      </dependency>
    2. 指定依赖项所属的组织为 org.springframework.boot,名称为 spring-boot-starter-jdbc,通过该依赖可以使用 Spring Boot 提供的 JDBC 相关功能,如连接池、数据源配置等。

      代码如下:

      <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-jdbc</artifactId>
      </dependency>
    3. 指定依赖项所属的组织为 org.springframework.boot,名称为 spring-boot-starter-test,作用范围为 test,通过该依赖可以使用 Spring Boot 提供的测试框架和工具,如 JUnit、Mockito、Hamcrest 等。

      代码如下:

      <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
      </dependency>
    4. 指定依赖项所属的组织为 com.oceanbase,名称为 oceanbase-client,版本号为 2.4.12,通过该依赖可以使用 seekdb 提供的客户端功能,如连接、查询、事务等。

      代码如下:

          <dependency>
      <groupId>com.oceanbase</groupId>
      <artifactId>oceanbase-client</artifactId>
      <version>2.4.12</version>
      </dependency>
    5. 指定依赖项所属的组织为 org.springframework.boot,名称为 spring-boot-starter-batch,通过该依赖可以使用 Spring Boot 提供的批处理功能。

      代码如下:

      <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-batch</artifactId>
      </dependency>
    6. 指定依赖项所属的组织为 org.springframework.boot,名称为 spring-boot-starter-data-jpa,通过该依赖可以使用 JPA 进行数据访问的必要依赖和配置,Spring Boot Starter Data JPA 是一个 Spring Boot 的启动器。

      代码如下:

      <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-jpa</artifactId>
      </dependency>
    7. 指定依赖项所属的组织为 org.apache.tomcat,名称为 tomcat-jdbc,通过该依赖可以使用 Tomcat 提供的 JDBC 连接池功能,包括连接池的配置、连接的获取和释放、连接的管理等。

      代码如下:

      <dependency>
      <groupId>org.apache.tomcat</groupId>
      <artifactId>tomcat-jdbc</artifactId>
      </dependency>
    8. 指定依赖项所属测试架构为 junit,名称为 junit,版本号为 4.10,作用范围为 test,通过该依赖可以用于添加 JUnit 单元测试依赖项的配置。

      代码如下:

      <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.10</version>
      <scope>test</scope>
      </dependency>
    9. 指定依赖项所属的组织为 javax.activation,名称为 javax.activation-api,版本号为 1.2.0,通过该依赖可以引入 Java Activation Framework(JAF)库。

      代码如下:

      <dependency>
      <groupId>javax.activation</groupId>
      <artifactId>javax.activation-api</artifactId>
      <version>1.2.0</version>
      </dependency>
    10. 指定依赖项所属的组织为 jakarta.persistence,名称为 jakarta.persistence-api,版本号为 2.2.3,通过该依赖可以添加 Jakarta Persistence API 依赖项的配置。 代码如下:

      <dependency>
      <groupId>jakarta.persistence</groupId>
      <artifactId>jakarta.persistence-api</artifactId>
      <version>2.2.3</version>
      </dependency>
  7. 配置 Maven 插件。

    指定依赖项所属的组织为 org.springframework.boot,名称为 spring-boot-maven-plugin,该插件用于将 Spring Boot 应用程序打包成可执行的 JAR 包或 WAR 包,并且可以直接运行。

    代码如下:

     <build>
    <plugins>
    <plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    </plugin>
    </plugins>
    </build>

application.properties 文件介绍

application.properties 文件用于配置数据库连接和其他相关的配置项。如数据库驱动程序、连接 URL、用户名和密码等。还包括了一些关于 JPA(Java Persistence API)和 Spring Batch 的配置,以及日志级别的设置。

  1. 数据库连接配置。

    • 通过 spring.datasource.driver 指定数据库驱动程序为 com.mysql.cj.jdbc.Driver,用于与 seekdb 建立连接。
    • 通过 spring.datasource.url 指定连接数据库的 URL。
    • 通过 spring.datasource.username 指定连接数据库的用户名。
    • 通过 spring.datasource.password 指定连接数据库的密码。

    代码如下:

    spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
    spring.datasource.url=jdbc:oceanbase://host:port/schema_name?characterEncoding=utf-8
    spring.datasource.username=user_name
    spring.datasource.password=******
  2. JPA 配置。

    • 通过 spring.jpa.show-sql 指定是否在日志中显示 SQL 语句,设置为 true,表示显示 SQL 语句。
    • 通过 spring.jpa.hibernate.ddl-auto 指定 Hibernate 的 DDL 操作行为,这里设置为 update,表示在应用程序启动时自动更新数据库结构。

    代码如下:

    spring.jpa.show-sql=true
    spring.jpa.hibernate.ddl-auto=update
  3. Spring Batch 配置:

    通过 spring.batch.job.enabled 指定是否启用 Spring Batch 作业,这里设置为 false,表示禁用自动执行批处理作业。

    代码如下:

    spring.batch.job.enabled=false
    信息

    在 Spring Batch 中,spring.batch.job.enabled 属性的作用是控制批处理作业的执行行为。

    • spring.batch.job.enabled=true (默认值):表示在 Spring Boot 应用启动时,自动运行所有定义的批处理作业。这意味着在应用启动时,Spring Batch 会自动发现并执行所有定义的作业。
    • spring.batch.job.enabled=false:表示禁用自动执行批处理作业。这通常用于开发或测试环境,或者当你希望手动控制作业执行时。设为 false 后,作业不会在应用启动时自动执行,您可以通过其他方式(如 REST 接口、命令行等)手动触发作业。

    总结来说,设置 spring.batch.job.enabled=false 可以帮助您避免在应用启动时自动运行作业,提供更大的灵活性来控制何时执行批处理作业。

  4. 日志配置:

    • 通过 logging.level.org.springframework 指定 Spring 框架的日志级别为 INFO
    • 通过 logging.level.com.example 指定应用程序自定义代码的日志级别为 DEBUG

    代码如下:

    logging.level.org.springframework=INFO
    logging.level.com.example=DEBUG

BatchApplication.java 文件介绍

BatchApplication.java 文件是 Spring Boot 应用程序的入口文件。

BatchApplication.java 文件的代码主要包括以下几个部分:

  1. 引用其他类和接口。

    声明当前文件包含以下接口和类:

    • SpringApplication 类:用于启动 Spring Boot 应用程序。
    • SpringBootApplication 注解:用于标记该类为 Spring Boot 应用程序的入口。

    代码如下:

        import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
  2. 定义 BatchApplication 类。

    使用 @SpringBootApplication 注解标记 BatchApplication 类为 Spring Boot 应用程序的入口。在 BatchApplication 类定义了一个静态的 main 方法作为应用程序的入口点。在该方法中,使用 SpringApplication.run 方法启动 Spring Boot 应用程序。再定义了一个名为 runBatchJob 的方法,用于运行批处理作业。

    代码如下:



    @SpringBootApplication
    public class BatchApplication {
    public static void main(String[] args) {
    SpringApplication.run(BatchApplication.class, args);
    }

    public void runBatchJob() {
    }
    }

BatchConfig.java 文件介绍

BatchConfig.java 文件用于配置批处理作业的步骤、读取器、处理器和写入器等组件。

BatchConfig.java 文件的代码主要包括以下几个部分:

  1. 引用其他类和接口。

    声明当前文件包含以下接口和类:

    • People 类:用于存储从数据库中读取的人员信息。
    • PeopleDESC 类:用于存储对人员信息进行转换或处理后的描述信息。
    • AddPeopleDescProcessor 类:将读取到的 People 对象转换为 PeopleDESC 对象,ItemProcessor 接口的实现类。
    • AddDescPeopleWriter 类:将 PeopleDESC 对象写入到目标位置,ItemWriter 接口的实现类。
    • Job 接口:表示一个批处理作业。
    • Step 接口:表示作业中的一个步骤。
    • EnableBatchProcessing 注解:Spring Batch 的配置注解,用于启用和配置 Spring Batch 处理功能。
    • JobBuilderFactory 类:用于创建和配置作业。
    • StepBuilderFactory 类:用于创建和配置步骤。
    • RunIdIncrementer 类:Spring Batch 的运行 ID(Run ID)自增器,用于在每次运行作业时增加运行的 ID。
    • ItemProcessor 接口:用于对读取到的项进行处理或转换。
    • ItemReader 接口:用于从数据源中读取项。
    • ItemWriter 接口:用于将处理或转换后的项写入到指定的目标位置。
    • JdbcCursorItemReader 类:用于从数据库中读取数据并返回游标结果集。
    • Autowired 注解:用于进行依赖注入。
    • Bean 注解:用于创建和配置 Bean。
    • ComponentScan 注解:用于指定要进行组件扫描的包或类。
    • Configuration 注解:用于将类标记为配置类。
    • EnableAutoConfiguration 注解:用于启用 Spring Boot 的自动配置。
    • SpringBootApplication 注解:用于标记该类为 Spring Boot 应用程序的入口。
    • DataSource 接口:用于表示数据库的连接。

    代码如下:

    import com.oceanbase.example.batch.model.People;
    import com.oceanbase.example.batch.model.PeopleDESC;
    import com.oceanbase.example.batch.processor.AddPeopleDescProcessor;
    import com.oceanbase.example.batch.writer.AddDescPeopleWriter;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.launch.support.RunIdIncrementer;
    import org.springframework.batch.item.ItemProcessor;
    import org.springframework.batch.item.ItemReader;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.database.JdbcCursorItemReader;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.jdbc.core.BeanPropertyRowMapper;

    import javax.sql.DataSource;
  2. 定义 BatchConfig 类。

    这是一个简单的 Spring Batch 批处理作业。它定义了数据的读取、处理和写入方式,并将这些步骤封装为一个作业。使用 Spring Batch 的注解和自动配置功能,通过配置类中的各个 @Bean 方法,可以创建相应的组件实例,并在 step1 中使用这些组件完成数据的读取、处理和写入。

    • 通过 @Configuration 表示这个类是一个配置类。
    • 通过 @EnableBatchProcessing 启用 Spring Batch 的处理功能,该注解会自动创建必要的 bean,如 JobRepositoryJobLauncher 等。
    • 通过 @SpringBootApplication 用于 Spring Boot 应用的主类注解,是 Spring Boot 应用的启动点。
    • 通过 @ComponentScan 指定要进行组件扫描的包,告诉 Spring 在这个包和子包中扫描和注册所有的组件。
    • 通过 @EnableAutoConfiguration 自动配置 Spring Boot 应用程序的基础设施。

    代码如下:

     @Configuration
    @EnableBatchProcessing
    @SpringBootApplication
    @ComponentScan("com.oceanbase.example.batch.writer")
    @EnableAutoConfiguration
    public class BatchConfig {
    }
    1. 定义 @Autowired 注解。

      通过 @Autowired 注解将 JobBuilderFactoryStepBuilderFactoryDataSource 注入到 BatchConfig 类中的成员变量中。JobBuilderFactory 用于创建和配置作业(Job)的工厂类,StepBuilderFactory 用于创建和配置步骤(Step)的工厂类,DataSource 用于获取数据库连接的接口。

      代码如下:

      @Autowired
      private JobBuilderFactory jobBuilderFactory;

      @Autowired
      private StepBuilderFactory stepBuilderFactory;

      @Autowired
      private DataSource dataSource;
    2. 定义 @Bean 注解。

      使用 @Bean 注解定义了几个方法,用于创建批处理作业的读取器、处理器、写入器、步骤和作业。

      • 通过 peopleReader 方法创建一个 ItemReader 组件实例,该组件使用 JdbcCursorItemReader 来读取数据库中的 People 对象数据。设置数据源 dataSource、设置 RowMapper 来将数据库行映射为 People 对象,设置 SQL 查询语句为 SELECT * FROM people

      • 通过 addPeopleDescProcessor 方法创建一个 ItemProcessor 组件实例,该组件使用 AddPeopleDescProcessor 来处理 People 对象,返回转换为 PeopleDESC 对象。

      • 通过 addDescPeopleWriter 方法创建一个 ItemWriter 组件实例,该组件使用 AddDescPeopleWriter 来将 PeopleDESC 对象写入到目标位置。

      • 通过 step1 方法创建一个 Step 组件实例,该步骤的名称为 step1,通过 stepBuilderFactory.get 获取步骤构建器,设置读取器为 ItemReader 组件,设置处理器为 ItemProcessor 组件,设置写入器为 ItemWriter 组件,设置 chunk 大小为 10,最后调用 build 构建并返回配置完成的 Step

      • 通过 importJob 方法创建一个 Job 组件实例,该作业的名称为 importJob,通过 jobBuilderFactory.get 获取作业构建器,设置增量器为 RunIdIncrementer,设定作业 flow 的最初步骤为 Step,最后调用 build 构建并返回配置完成的 Job

        代码如下:

        @Bean
        public ItemReader<People> peopleReader() {
        JdbcCursorItemReader<People> reader = new JdbcCursorItemReader<>();
        reader.setDataSource((javax.sql.DataSource) dataSource);
        reader.setRowMapper(new BeanPropertyRowMapper<>(People.class));
        reader.setSql("SELECT * FROM people");
        return reader;
        }

        @Bean
        public ItemProcessor<People, PeopleDESC> addPeopleDescProcessor() {
        return new AddPeopleDescProcessor();
        }

        @Bean
        public ItemWriter<PeopleDESC> addDescPeopleWriter() {
        return new AddDescPeopleWriter();
        }

        @Bean
        public Step step1(ItemReader<People> reader, ItemProcessor<People, PeopleDESC> processor,
        ItemWriter<PeopleDESC> writer) {
        return stepBuilderFactory.get("step1")
        .<People, PeopleDESC>chunk(10)
        .reader(reader)
        .processor(processor)
        .writer(writer)
        .build();
        }

        @Bean
        public Job importJob(Step step1) {
        return jobBuilderFactory.get("importJob")
        .incrementer(new RunIdIncrementer())
        .flow(step1)
        .end()
        .build();
        }

People.java 文件介绍

People.java 文件是创建一个 People 类的数据模型,表示一个人的信息。该类包含了两个私有成员变量 nameage,以及相应的 gettersetter 方法。最后重写 toString 方法用于打印对象的信息。其中 name 表示人的姓名,age 表示人的年龄。通过 gettersetter 方法可以获取和设置这些属性的值。

该类的作用是为批处理程序的输入和输出提供一种存储和传递数据的方式。在批处理的读取和写入操作中,使用 People 对象来存储数据,通过 setter 方法设置数据,通过 getter 方法获取数据。

代码如下:

    public class People {
private String name;
private int age;

// getters and setters

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "People [name=" + name + ", age=" + age + "]";
}
// Getters and setters
}

PeopleDESC.java 文件介绍

PeopleDESC.java 文件是创建一个 PeopleDESC 类的数据模型,用于表示人员的信息,PeopleDESC 类有四个属性:nameagedescid,分别表示人的姓名、年龄、描述和标识。该类包含了相应的 gettersetter 方法来访问和设置属性的值。重写 toString 方法用于返回类的字符串表示形式,包含姓名、年龄和描述。

People 类类似,PeopleDESC 类也用于在批处理程序的输入和输出中存储和传递数据。

代码如下:

    public class PeopleDESC {
private String name;
private int age;
private String desc;
private int id;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

public String getDesc() {
return desc;
}

public void setDesc(String desc) {
this.desc = desc;
}

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

@Override
public String toString() {
return "PeopleDESC [name=" + name + ", age=" + age + ", desc=" + desc + "]";
}
}

AddPeopleDescProcessor.java 文件介绍

AddPeopleDescProcessor.java 文件定义一个名为 AddPeopleDescProcessor 的类,实现 ItemProcessor 接口,用于将 People 对象转换为 PeopleDESC 对象。

AddPeopleDescProcessor.java 文件的代码主要包括以下几个部分:

  1. 引用其他类和接口。

    声明当前文件包含以下接口和类:

    • People 类:用于存储从数据库中读取的人员信息。
    • PeopleDESC 类:用于存储对人员信息进行转换或处理后的描述信息。
    • ItemProcessor 接口:用于对读取到的项进行处理或转换。

    代码如下:

    import com.oceanbase.example.batch.model.People;
    import com.oceanbase.example.batch.model.PeopleDESC;
    import org.springframework.batch.item.ItemProcessor;
  2. 定义 AddPeopleDescProcessor 类。

    ItemProcessor 接口的 AddPeopleDescProcessor 类用于将 People 对象转换为 PeopleDESC 对象,实现在批处理过程中对输入的数据的处理逻辑。

    在该类的 process 方法中,首先创建一个 PeopleDESC 对象 desc,然后通过 item 参数获取 People 对象的属性(nameage),将这些属性设置到 desc 对象中。同时,也为 desc 对象的 desc 属性赋值,赋值的逻辑为根据 People 对象的属性生成一段描述信息。最后,返回通过处理后的 PeopleDESC 对象。

    代码如下:

    public class AddPeopleDescProcessor implements ItemProcessor<People, PeopleDESC> {
    @Override
    public PeopleDESC process(People item) throws Exception {
    PeopleDESC desc = new PeopleDESC();
    desc.setName(item.getName());
    desc.setAge(item.getAge());
    desc.setDesc("This is " + item.getName() + " with age " + item.getAge());
    return desc;
    }
    }

AddDescPeopleWriter.java 文件介绍

AddDescPeopleWriter.java 文件实现了 ItemWriter 接口的 AddDescPeopleWriter 类,用于将 People 对象写入到数据库。

AddDescPeopleWriter.java 文件的代码主要包括以下几个部分:

  1. 引用其他类和接口。

    声明当前文件包含以下接口和类:

    • PeopleDESC 类:用于存储对人员信息进行转换或处理后的描述信息。
    • ItemWriter 接口:用于将处理或转换后的项写入到指定的目标位置。
    • Autowired 注解:用于进行依赖注入。
    • JdbcTemplate 类:提供执行 SQL 语句的方法。
    • List 接口:用于操作查询结果集合。

    代码如下:

    import com.oceanbase.example.batch.model.PeopleDESC;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jdbc.core.JdbcTemplate;

    import java.util.List;
  2. 定义 AddDescPeopleWriter 类。

    1. 使用 @Autowired 注解将 JdbcTemplate 实例自动注入进来,在写入数据时使用该实例执行数据库操作。

      代码如下:

          @Autowired
      private JdbcTemplate jdbcTemplate;
    2. write 方法中,对传入的 List<? extends PeopleDESC> 进行遍历,依次取出每个 PeopleDESC 对象。首先,执行 SQL 语句 DROP TABLE people_desc,用于删除可能已存在的名为 people_desc 的表。然后,执行 SQL 语句 CREATE TABLE people_desc (id INT PRIMARY KEY, name VARCHAR2(255), age INT, description VARCHAR2(255)),用于创建一个名为 people_desc 的表,表中包含了 idnameagedescription 四个列。接下来,使用 SQL 语句 INSERT INTO people_desc (id, name, age, description) VALUES (?, ?, ?, ?),将每个 PeopleDESC 对象的属性值分别插入到 people_desc 表中。

      代码如下:

          @Override
      public void write(List<? extends PeopleDESC> items) throws Exception {
      // 先删除可能存在的表
      jdbcTemplate.execute("DROP TABLE people_desc");
      // 建表语句
      String createTableSql = "CREATE TABLE people_desc (id INT PRIMARY KEY, name VARCHAR2(255), age INT, description VARCHAR2(255))";
      jdbcTemplate.execute(createTableSql);
      for (PeopleDESC item : items) {
      String sql = "INSERT INTO people_desc (id, name, age, description) VALUES (?, ?, ?, ?)";
      jdbcTemplate.update(sql, item.getId(), item.getName(), item.getAge(), item.getDesc());
      }
      }

AddPeopleWriter.java 文件介绍

AddPeopleWriter.java 文件实现了 ItemWriter 接口的 AddDescPeopleWriter 类,用于将 PeopleDESC 对象写入到数据库。

AddPeopleWriter.java 文件的代码主要包括以下几个部分:

  1. 引用其他类和接口。

    声明当前文件包含以下接口和类:

    • People 类:用于存储从数据库中读取的人员信息。
    • ItemWriter 接口:用于将处理或转换后的项写入到指定的目标位置。
    • Autowired 注解:用于进行依赖注入。
    • JdbcTemplate 类:提供执行 SQL 语句的方法。
    • Component 注解:用于将该类标记为 Spring 组件。
    • List 接口:用于操作查询结果集合。

    代码如下:

    import com.oceanbase.example.batch.model.People;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jdbc.core.JdbcTemplate;
    import org.springframework.stereotype.Component;

    import java.util.List;
  2. 定义 AddPeopleWriter 类。

    1. 使用 @Autowired 注解将 JdbcTemplate 实例自动注入进来,在写入数据时使用该实例执行数据库操作。

      代码如下:

          @Autowired
      private JdbcTemplate jdbcTemplate;
    2. write 方法中,对传入的 List<? extends People> 进行遍历,依次取出每个 People 对象。首先,执行 SQL 语句 DROP TABLE people,用于删除可能已存在的名为 people 的表。然后,执行 SQL 语句 CREATE TABLE people (name VARCHAR2(255), age INT),用于创建一个名为 people 的表,表中包含了 nameage 两个列。接下来,使用 SQL 语句 INSERT INTO people (name, age) VALUES (?, ?),将每个 People 对象的属性值分别插入到 people 表中。

      代码如下:

      @Override
      public void write(List<? extends People> items) throws Exception {
      // 先删除可能存在的表
      jdbcTemplate.execute("DROP TABLE people");
      // 建表语句
      String createTableSql = "CREATE TABLE people (name VARCHAR2(255), age INT)";
      jdbcTemplate.execute(createTableSql);
      for (People item : items) {
      String sql = "INSERT INTO people (name, age) VALUES (?, ?)";
      jdbcTemplate.update(sql, item.getName(), item.getAge());
      }
      }

BatchConfigTest.java 文件介绍

BatchConfigTest.java 文件是一个使用 JUnit 进行测试的类,用于测试 Spring Batch 的作业配置。

BatchConfigTest.java 文件的代码主要包括以下几个部分:

  1. 引用其他类和接口。

    声明当前文件包含以下接口和类:

    • Assert 类:用于断言测试结果。
    • Test 注解:用于标记测试方法。
    • RunWith 注解:用于指定测试运行器。
    • Job 接口:表示一个批处理作业。
    • JobExecution 类:用于表示批处理作业的执行。
    • JobParameters 类:用于表示批处理作业的参数。
    • JobParametersBuilder 类:用于构建批处理作业的参数。
    • JobLauncher 接口:用于启动批处理作业。
    • Autowired 注解:用于进行依赖注入。
    • SpringBootTest 注解:用于指定测试类为 Spring Boot 测试。
    • SpringRunner 类:用于指定测试运行器为 SpringRunner。

    代码如下:

    import org.junit.Assert;
    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.JobParameters;
    import org.springframework.batch.core.JobParametersBuilder;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;

    import javax.batch.runtime.BatchStatus;
    import java.util.UUID;
  2. 定义 BatchConfigTest 类。

    通过使用 SpringBootTest 注解和 SpringRunner 运行器,可以进行 Spring Boot 的集成测试。在 testJob 方法中,使用 JobLauncherTestUtils 辅助类启动批处理作业,并使用断言来验证作业的执行状态。

    1. 使用 @Autowired 注解自动注入 JobLauncherTestUtils 实例。

      代码如下:

      @Autowired
      private JobLauncherTestUtils jobLauncherTestUtils;
    2. 使用 @Test 注解标记 testJob 方法为测试方法。在该方法中,首先创建 JobParameters 对象,然后使用 jobLauncherTestUtils.launchJob 方法启动批处理作业,并使用 Assert.assertEquals 方法断言作业的执行状态为 COMPLETED

      代码如下:

      @Test
      public void testJob() throws Exception {
      JobParameters jobParameters = new JobParametersBuilder()
      .addString("jobParam", "paramValue")
      .toJobParameters();

      JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters);

      Assert.assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus());
      }
    3. 使用 @Autowired 注解自动注入 JobLauncher 实例。

      代码如下:

      @Autowired
      private JobLauncher jobLauncher;
    4. 使用 @Autowired 注解自动注入 Job 实例。

      代码如下:

      @Autowired
      private Job job;
    5. 定义了一个名为 JobLauncherTestUtils 的内部类,用于辅助启动批处理作业。在该类中定义了一个 launchJob 方法,用于启动批处理作业。在该方法中,使用 jobLauncher.run 方法启动作业,并返回作业的执行结果。

      代码如下:

      private class JobLauncherTestUtils {
      public JobExecution launchJob(JobParameters jobParameters) throws Exception {
      return jobLauncher.run(job, jobParameters);
      }
      }

AddPeopleDescProcessorTest.java 文件介绍

AddPeopleDescProcessorTest.java 文件是一个使用 JUnit 进行测试的类,用于测试 Spring Batch 的作业配置。

AddPeopleDescProcessorTest.java 文件的代码主要包括以下几个部分:

  1. 引用其他类和接口。

    声明当前文件包含以下接口和类:

    • People 类:用于存储从数据库中读取的人员信息。
    • PeopleDESC 类:用于存储对人员信息进行转换或处理后的描述信息。
    • Assert 类:用于验证测试中的预期结果和实际结果是否一致。
    • Test 注解:用于标记测试方法。
    • RunWith 注解:用于指定测试运行器。
    • Autowired 注解:用于进行依赖注入。
    • SpringBootTest 注解:用于指定测试类为 Spring Boot 测试。
    • SpringRunner 类:用于指定测试运行器为 SpringRunner。

    代码如下:

    import com.oceanbase.example.batch.model.People;
    import com.oceanbase.example.batch.model.PeopleDESC;
    import org.junit.Assert;
    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
  2. 定义 AddPeopleDescProcessorTest 类。

    通过使用 SpringBootTest 注解和 SpringRunner 运行器,进行 Spring Boot 的集成测试。

    1. 使用 @Autowired 注解自动注入 AddPeopleDescProcessor 实例。

      代码如下:

      @Autowired
      private AddPeopleDescProcessor processor;
    2. 使用 @Test 注解标记 testProcess 方法为测试方法。在该方法中,首先创建一个 People 对象,然后使用 processor.process 方法处理该对象,并将结果赋值给一个 PeopleDESC 对象。

      代码如下:

      @Test
      public void testProcess() throws Exception {
      People people = new People();
      people.setName("John");
      people.setAge(25);

      PeopleDESC desc = processor.process(people);
      }

AddDescPeopleWriterTest.java 文件介绍

AddDescPeopleWriterTest.java 文件是一个使用 JUnit 进行测试的类,用于测试 AddDescPeopleWriter 的写入逻辑。

AddDescPeopleWriterTest.java 文件的代码主要包括以下几个部分:

  1. 引用其他类和接口。

    声明当前文件包含以下接口和类:

    • PeopleDESC 类:用于存储对人员信息进行转换或处理后的描述信息。
    • Assert 类:用于断言测试结果。
    • Test 注解:用于标记测试方法。
    • RunWith 注解:用于指定测试运行器。
    • Autowired 注解:用于进行依赖注入。
    • SpringBootTest 注解:用于指定测试类为 Spring Boot 测试。
    • JdbcTemplate 类:提供执行 SQL 语句的方法。
    • SpringRunner 类:用于指定测试运行器为 SpringRunner。
    • ArrayList 类,用于创建一个空的列表。
    • List 接口:用于操作查询结果集合。

    代码如下:

    import com.oceanbase.example.batch.model.PeopleDESC;
    import org.junit.Assert;
    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.jdbc.core.JdbcTemplate;
    import org.springframework.test.context.junit4.SpringRunner;

    import java.util.ArrayList;
    import java.util.List;
  2. 定义 AddDescPeopleWriterTest 类。

    通过使用 SpringBootTest 注解和 SpringRunner 运行器,进行 Spring Boot 的集成测试。

    1. 使用 @Autowired 注入实例。 使用 @Autowired 注解自动注入 AddPeopleDescProcessorJdbcTemplate 实例。

      代码如下:

      @Autowired
      private AddDescPeopleWriter writer;
      @Autowired
      private JdbcTemplate jdbcTemplate;
    2. 使用 @Test 测试数据的插入和输出。 使用 @Test 注解标记 testWrite 方法为测试方法。在该方法中,首先创建一个空的 peopleDescList 列表,并向列表中添加两个 PeopleDESC 对象。然后使用 writer.write 方法将列表中的数据写入到数据库中。接着使用 jdbcTemplate 执行查询语句,获取 people_desc 表中的数据,并使用断言语句验证数据的正确性。最后,将查询结果输出到控制台,并输出作业执行完成的信息。

      1. 插入数据到 people_desc 表。 首先创建了一个空的 PeopleDESC 对象列表 peopleDescList。其次创建了两个 PeopleDESC 对象 desc1desc2,并分别设置了它们的属性值。将 desc1desc2 添加到 peopleDescList 列表中。接下来调用了 writerwrite 方法,将 peopleDescList 中的对象写入数据库中的 people_desc 表。然后使用 JdbcTemplate 执行了一条查询语句 SELECT COUNT(*) FROM people_desc,获取了 people_desc 表中的记录数,并将结果赋值给变量 count。最后使用 Assert.assertEquals 方法进行断言,验证 count 的值是否等于 2

        代码如下:

           List<PeopleDESC> peopleDescList = new ArrayList<>();
        PeopleDESC desc1 = new PeopleDESC();
        desc1.setId(1);
        desc1.setName("John");
        desc1.setAge(25);
        desc1.setDesc("This is John with age 25");
        peopleDescList.add(desc1);
        PeopleDESC desc2 = new PeopleDESC();
        desc2.setId(2);
        desc2.setName("Alice");
        desc2.setAge(30);
        desc2.setDesc("This is Alice with age 30");
        peopleDescList.add(desc2);
        writer.write(peopleDescList);

        String selectSql = "SELECT COUNT(*) FROM people_desc";
        int count = jdbcTemplate.queryForObject(selectSql, Integer.class);
        Assert.assertEquals(2, count);
      2. 输出 people_desc 表中的数据。 首先使用 JdbcTemplate 执行了一条查询语句 SELECT * FROM people_desc,并使用 lambda 表达式处理查询结果。在 lambda 表达式中,使用 rs.getIntrs.getString 等方法获取查询结果集中的字段值,并将字段值设置到一个新创建的 PeopleDESC 对象中。将每个新创建的 PeopleDESC 对象添加到一个结果列表 resultDesc 中。之后打印了一行提示信息 people_desc 表中的数据:,然后使用 for 循环遍历 resultDesc 列表中的每个 PeopleDESC 对象,并使用 System.out.println 打印输出每个对象的内容。最后打印了一条作业执行完成的信息。

        代码如下:

        List<PeopleDESC> resultDesc = jdbcTemplate.query("SELECT * FROM people_desc", (rs, rowNum) -> {
        PeopleDESC desc = new PeopleDESC();
        desc.setId(rs.getInt("id"));
        desc.setName(rs.getString("name"));
        desc.setAge(rs.getInt("age"));
        desc.setDesc(rs.getString("description"));
        return desc;
        });

        System.out.println("people_desc 表中的数据:");
        for (PeopleDESC desc : resultDesc) {
        System.out.println(desc);
        }

        // 输出作业执行完成后的信息
        System.out.println("Batch Job execution completed.");

AddPeopleWriterTest.java 文件介绍

AddPeopleWriterTest.java 文件是一个使用 JUnit 进行测试的类,用于测试 AddPeopleWriterTest 的写入逻辑。

AddPeopleWriterTest.java 文件的代码主要包括以下几个部分:

  1. 引用其他类和接口。

    声明当前文件包含以下接口和类:

    • People 类:用于存储从数据库中读取的人员信息。
    • Test 注解:用于标记测试方法。
    • RunWith 注解:用于指定测试运行器。
    • Autowired 注解:用于进行依赖注入。
    • SpringBootApplication 注解:用于标记该类为 Spring Boot 应用程序的入口。
    • SpringBootTest 注解:用于指定测试类为 Spring Boot 测试。
    • ComponentScan 注解:用于指定要进行组件扫描的包或类。
    • JdbcTemplate 类:提供执行 SQL 语句的方法。
    • SpringRunner 类:用于指定测试运行器为 SpringRunner
    • ArrayList 类,用于创建一个空的列表。
    • List 接口:用于操作查询结果集合。

    代码如下:

    import com.oceanbase.example.batch.model.People;
    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.jdbc.core.JdbcTemplate;
    import org.springframework.test.context.junit4.SpringRunner;

    import java.util.ArrayList;
    import java.util.List;
  2. 定义 AddPeopleWriterTest 类。

    通过使用 SpringBootTest 注解和 SpringRunner 运行器,进行 Spring Boot 的集成测试,并使用 @ComponentScan 注解指定要扫描的包路径。

    1. 使用 @Autowired 注入实例。 使用 @Autowired 注解自动注入 addPeopleWriterJdbcTemplate 实例。

      代码如下:

      @Autowired
      private AddPeopleWriter addPeopleWriter;
      @Autowired
      private JdbcTemplate jdbcTemplate;
    2. 使用 @Test 测试数据的插入和输出。

      1. 插入数据到 people 表。 首先,创建一个空的 People 对象列表 peopleList。然后,创建两个 People 对象 person1person2,并设置它们的名称和年龄属性。接着,将这两个 People 对象添加到 peopleList 列表中。之后,调用 addPeopleWriterwrite 方法,将 peopleList 作为参数传递给该方法,用于将这些 People 对象写入数据库。

        代码如下:

           List<People> peopleList = new ArrayList<>();
        People person1 = new People();
        person1.setName("zhangsan");
        person1.setAge(27);
        peopleList.add(person1);
        People person2 = new People();
        person2.setName("lisi");
        person2.setAge(35);
        peopleList.add(person2);
        addPeopleWriter.write(peopleList);
      2. 输出 people 表中的数据。 首先使用 JdbcTemplate 执行了一条查询语句 SELECT * FROM people,并使用 lambda 表达式处理查询结果。在 lambda 表达式中,使用 rs.getStringrs.getInt 方法获取查询结果集中的字段值,并将字段值设置到一个新创建的 People 对象中。将每个新创建的 People 对象添加到一个结果列表 result 中。接下来打印了一行提示信息 people 表中的数据: 然后使用 for 循环遍历 result 列表中的每个 People 对象,并使用 System.out.println 打印输出每个对象的内容。最后打印了一条作业执行完成的信息。

        代码如下:

           List<People> result = jdbcTemplate.query("SELECT * FROM people", (rs, rowNum) -> {
        People person = new People();
        person.setName(rs.getString("name"));
        person.setAge(rs.getInt("age"));
        return person;
        });

        System.out.println("people 表中的数据:");
        for (People person : result) {
        System.out.println(person);
        }

        // 输出作业执行完成后的信息
        System.out.println("Batch Job execution completed.");

完整的代码展示

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.11</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.oceanbase</groupId>
<artifactId>java-oceanbase-springboot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>java-oceanbase-springbatch</name>
<description>Demo project for Spring Batch</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>oceanbase-client</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.apache.tomcat</groupId>
<artifactId>tomcat-jdbc</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.activation</groupId>
<artifactId>javax.activation-api</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>jakarta.persistence</groupId>
<artifactId>jakarta.persistence-api</artifactId>
<version>2.2.3</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

application.properties

#configuration database

spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:oceanbase://host:port/schema_name?characterEncoding=utf-8
spring.datasource.username=user_name
spring.datasource.password=

# JPA
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update

# Spring Batch
spring.batch.job.enabled=false

#
logging.level.org.springframework=INFO
logging.level.com.example=DEBUG

BatchApplication.java

package com.oceanbase.example.batch;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BatchApplication {
public static void main(String[] args) {
SpringApplication.run(BatchApplication.class, args);
}

public void runBatchJob() {
}
}

BatchConfig.java

package com.oceanbase.example.batch.config;

import com.oceanbase.example.batch.model.People;
import com.oceanbase.example.batch.model.PeopleDESC;
import com.oceanbase.example.batch.processor.AddPeopleDescProcessor;
import com.oceanbase.example.batch.writer.AddDescPeopleWriter;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.BeanPropertyRowMapper;

import javax.sql.DataSource;
//import javax.activation.DataSource;

@Configuration
@EnableBatchProcessing
@SpringBootApplication
@ComponentScan("com.oceanbase.example.batch.writer")
@EnableAutoConfiguration
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
private DataSource dataSource;// 使用 Spring Boot 自动配置提供的默认 dataSource



@Bean
public ItemReader<People> peopleReader() {
JdbcCursorItemReader<People> reader = new JdbcCursorItemReader<>();
reader.setDataSource((javax.sql.DataSource) dataSource);
reader.setRowMapper(new BeanPropertyRowMapper<>(People.class));
reader.setSql("SELECT * FROM people");
return reader;
}

@Bean
public ItemProcessor<People, PeopleDESC> addPeopleDescProcessor() {
return new AddPeopleDescProcessor();
}

@Bean
public ItemWriter<PeopleDESC> addDescPeopleWriter() {
return new AddDescPeopleWriter();
}

@Bean
public Step step1(ItemReader<People> reader, ItemProcessor<People, PeopleDESC> processor,
ItemWriter<PeopleDESC> writer) {
return stepBuilderFactory.get("step1")
.<People, PeopleDESC>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}

@Bean
public Job importJob(Step step1) {
return jobBuilderFactory.get("importJob")
.incrementer(new RunIdIncrementer())
.flow(step1)
.end()
.build();
}
}

People.java

package com.oceanbase.example.batch.model;

public class People {
private String name;
private int age;

// getters and setters

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "People [name=" + name + ", age=" + age + "]";
}
// Getters and setters
}

PeopleDESC.java

package com.oceanbase.example.batch.model;

public class PeopleDESC {
private String name;
private int age;
private String desc;
private int id;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

public String getDesc() {
return desc;
}

public void setDesc(String desc) {
this.desc = desc;
}

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

@Override
public String toString() {
return "PeopleDESC [name=" + name + ", age=" + age + ", desc=" + desc + "]";
}
}

AddPeopleDescProcessor.java

package com.oceanbase.example.batch.processor;

import com.oceanbase.example.batch.model.People;
import com.oceanbase.example.batch.model.PeopleDESC;
import org.springframework.batch.item.ItemProcessor;


public class AddPeopleDescProcessor implements ItemProcessor<People, PeopleDESC> {
@Override
public PeopleDESC process(People item) throws Exception {
PeopleDESC desc = new PeopleDESC();
desc.setName(item.getName());
desc.setAge(item.getAge());
desc.setDesc("This is " + item.getName() + " with age " + item.getAge());
return desc;
}
}

AddDescPeopleWriter.java

package com.oceanbase.example.batch.writer;

import com.oceanbase.example.batch.model.PeopleDESC;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;

import java.util.List;

public class AddDescPeopleWriter implements ItemWriter<PeopleDESC> {
@Autowired
private JdbcTemplate jdbcTemplate;

@Override
public void write(List<? extends PeopleDESC> items) throws Exception {
// 先删除可能存在的表
jdbcTemplate.execute("DROP TABLE people_desc");
// 建表语句
String createTableSql = "CREATE TABLE people_desc (id INT PRIMARY KEY, name VARCHAR2(255), age INT, description VARCHAR2(255))";
jdbcTemplate.execute(createTableSql);
for (PeopleDESC item : items) {
String sql = "INSERT INTO people_desc (id, name, age, description) VALUES (?, ?, ?, ?)";
jdbcTemplate.update(sql, item.getId(), item.getName(), item.getAge(), item.getDesc());
}
}
}

AddPeopleWriter.java

package com.oceanbase.example.batch.writer;

import com.oceanbase.example.batch.model.People;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class AddPeopleWriter implements ItemWriter<People> {
@Autowired
private JdbcTemplate jdbcTemplate;

@Override
public void write(List<? extends People> items) throws Exception {
// 先删除可能存在的表
jdbcTemplate.execute("DROP TABLE people");
// 建表语句
String createTableSql = "CREATE TABLE people (name VARCHAR2(255), age INT)";
jdbcTemplate.execute(createTableSql);
for (People item : items) {
String sql = "INSERT INTO people (name, age) VALUES (?, ?)";
jdbcTemplate.update(sql, item.getName(), item.getAge());
}
}
}

BatchConfigTest.java

package com.oceanbase.example.batch.config;

import com.oceanbase.example.batch.writer.AddDescPeopleWriter;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;
import javax.batch.runtime.BatchStatus;
import java.util.UUID;

@RunWith(SpringRunner.class)
@SpringBootTest
public class BatchConfigTest {

@Test
public void testJob() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("jobParam", UUID.randomUUID().toString())
.toJobParameters();

JobLauncherTestUtils jobLauncherTestUtils = new JobLauncherTestUtils();
JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters);
Assert.assertEquals(BatchStatus.COMPLETED.toString(), jobExecution.getStatus().toString());
}

@Autowired
private JobLauncher jobLauncher;

@Autowired
private Job job;

private class JobLauncherTestUtils {

public JobExecution launchJob(JobParameters jobParameters) throws Exception {
return jobLauncher.run(job, jobParameters);
}
}
}

AddPeopleDescProcessorTest.java

package com.oceanbase.example.batch.processor;

import com.oceanbase.example.batch.model.People;
import com.oceanbase.example.batch.model.PeopleDESC;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class AddPeopleDescProcessorTest {
@Autowired
private AddPeopleDescProcessor processor;

@Test
public void testProcess() throws Exception {
People people = new People();
// people.setName("John");
// people.setAge(25);

PeopleDESC desc = processor.process(people);

// Assert.assertEquals("John", desc.getName());
// Assert.assertEquals(25, desc.getAge());
// Assert.assertEquals("This is John with age 25", desc.getDesc());
}
}

AddDescPeopleWriterTest.java

package com.oceanbase.example.batch.writer;

import com.oceanbase.example.batch.model.PeopleDESC;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.ArrayList;
import java.util.List;

@RunWith(SpringRunner.class)
@SpringBootTest
public class AddDescPeopleWriterTest {
@Autowired
private AddDescPeopleWriter writer;
@Autowired
private JdbcTemplate jdbcTemplate;

@Test
public void testWrite() throws Exception {


// 插入数据到people_desc表
List<PeopleDESC> peopleDescList = new ArrayList<>();
PeopleDESC desc1 = new PeopleDESC();
desc1.setId(1);
desc1.setName("John");
desc1.setAge(25);
desc1.setDesc("This is John with age 25");
peopleDescList.add(desc1);
PeopleDESC desc2 = new PeopleDESC();
desc2.setId(2);
desc2.setName("Alice");
desc2.setAge(30);
desc2.setDesc("This is Alice with age 30");
peopleDescList.add(desc2);
writer.write(peopleDescList);

String selectSql = "SELECT COUNT(*) FROM people_desc";
int count = jdbcTemplate.queryForObject(selectSql, Integer.class);
Assert.assertEquals(2, count);

// 输出people_desc表中的数据
List<PeopleDESC> resultDesc = jdbcTemplate.query("SELECT * FROM people_desc", (rs, rowNum) -> {
PeopleDESC desc = new PeopleDESC();
desc.setId(rs.getInt("id"));
desc.setName(rs.getString("name"));
desc.setAge(rs.getInt("age"));
desc.setDesc(rs.getString("description"));
return desc;
});

System.out.println("people_desc 表中的数据:");
for (PeopleDESC desc : resultDesc) {
System.out.println(desc);
}

// 输出作业执行完成后的信息
System.out.println("Batch Job execution completed.");
}
}

AddPeopleWriterTest.java

package com.oceanbase.example.batch.writer;

import com.oceanbase.example.batch.model.People;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.ArrayList;
import java.util.List;

@RunWith(SpringRunner.class)
@SpringBootTest
@SpringBootApplication
@ComponentScan("com.oceanbase.example.batch.writer")
public class AddPeopleWriterTest {

@Autowired
private AddPeopleWriter addPeopleWriter;
@Autowired
private JdbcTemplate jdbcTemplate;

@Test
public void testWrite() throws Exception {
// 插入数据到people表
List<People> peopleList = new ArrayList<>();
People person1 = new People();
person1.setName("zhangsan");
person1.setAge(27);
peopleList.add(person1);
People person2 = new People();
person2.setName("lisi");
person2.setAge(35);
peopleList.add(person2);
addPeopleWriter.write(peopleList);

// 查询并输出结果
List<People> result = jdbcTemplate.query("SELECT * FROM people", (rs, rowNum) -> {
People person = new People();
person.setName(rs.getString("name"));
person.setAge(rs.getInt("age"));
return person;
});

System.out.println("people 表中的数据:");
for (People person : result) {
System.out.println(person);
}

// 输出作业执行完成后的信息
System.out.println("Batch Job execution completed.");
}
}

相关文档

更多 OceanBase Connector/J 的信息,请参见 OceanBase JDBC 驱动程序