SpringBatch 连接 seekdb 示例程序
本文将介绍如何使用 SpringBatch 框架和 seekdb 构建一个应用程序,实现创建表、插入数据和查询数据等基本操作。
点击下载 java-oceanbase-springbatch 示例工程
前提条件
- 您已安装 seekdb。
- 您已安装 JDK 1.8 和 Maven。
- 您已安装 IntelliJ IDEA。
本文档运行代码使用的工具是 IntelliJ IDEA 2021.3.2 (Community Edition) 版本,您也可以根据个人喜好选择适合自己的工具运行示例代码。
操作步骤
本文中给出的操作步骤是基于 Windows 环境生成。如果您使用的是其他操作系统环境或编译器,那么操作步骤可能会略有不同。
- 获取 seekdb 连接串。
- 导入
java-oceanbase-springbatch项目到 IDEA 中。 - 修改
java-oceanbase-springbatch项目中的数据库连接信息。 - 运行
java-oceanbase-springbatch项目。
步骤一:获取 seekdb 连接串
-
联系 seekdb 部署人员或者管理员获取相应的数据库连接串。
mysql -hxx.xx.xx.xx -P2881 -uroot -p**** -A -
根据已部署的 seekdb 填写下面 URL 的对应信息。
信息在
application.properties文件中需要这里的 URL 信息。jdbc:oceanbase://host:port/schema_name?user=$user_name&password=$password&characterEncoding=utf-8参数说明:
host:提供 seekdb 的连接 IP。应该被实际的 IP 替换,也可以使用本地 IP 及 127.0.0.1。port:提供 seekdb 接端口。应该被实际的端口替换,默认是 2881,在部署 seekdb 时可自定义。schema_name:需要访问的 Schema 名称。user_name:通过-u参数指定,格式为 用户。默认用户为root。password:提供账户密码。characterEncoding:提供字符编码。
更多 URL 参数说明信息,请参见 数据库 URL。
步骤二:导入 java-oceanbase-springbatch 项目到 IDEA 中
-
打开 IntelliJ IDEA,选择 File > Open... 选项。

-
在弹出的 Open File or Project 窗口中,选择对应的项目文件,单击 OK 完成项目文件导入。
-
IntelliJ IDEA 将会自动识别项目中的各类文件,并在 Project 工具窗口中,可以查看项目的目录结构、文件列表、模块列表、依赖关系等信息。Project 工具窗口通常位于 IntelliJ IDEA 界面的最左侧,默认情况下是打开的。如果 Project 工具窗口被关闭了,可以通过点击菜单栏中的 View > Tool Windows > Project 或者使用快捷键 Alt + 1 来重新打开它。
信息当使用 IntelliJ IDEA 导入项目时,IntelliJ IDEA 会自动检测项目中的 pom.xml 文件,并根据文件中描述的依赖关系自动下载所需的依赖库,并将它们添加到项目中。
-
查看项目情况。

步骤三:修改 java-oceanbase-springbatch 项目中的数据库连接信息
根据 步骤一:获取 seekdb 连接串 中的信息修改 application.properties 文件中的数据库连接信息。
示例如下:
- 数据库驱动的名称为:
com.mysql.cj.jdbc.Driver - seekdb 的 IP 地址为
10.10.10.1。 - 访问端口使用的是 2881。
- 需要访问的 Schema 名称为
test。 - 连接账户是
root。 - 密码是
******。
示例代码如下:
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:oceanbase://10.10.10.1:2881/test?characterEncoding=utf-8
spring.datasource.username=root
spring.datasource.password=******
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update
spring.batch.job.enabled=false
logging.level.org.springframework=INFO
logging.level.com.example=DEBUG
步骤四:运行 java-oceanbase-springbatch 项目
-
运行
AddDescPeopleWriterTest.java文件。- 在项目结构中找到 src > test > java 中找到
AddDescPeopleWriterTest.java文件。 - 在工具菜单栏中选择 Run > Run... > AddDescPeopleWriterTest.testWrite,或直接单击右上角绿色三角形运行。
- 通过 IDEA 的控制台来查看项目的日志信息和输出结果。
people_desc 表中的数据:
PeopleDESC [name=John, age=25, desc=This is John with age 25]
PeopleDESC [name=Alice, age=30, desc=This is Alice with age 30]
Batch Job execution completed. - 在项目结构中找到 src > test > java 中找到
-
运行
AddPeopleWriterTest.java文件。- 在项目结构中找到 src > test > java 中找到
AddDescPeopleWriterTest.java文件。 - 在工具菜单栏中选择 Run > Run... > AddPeopleWriterTest.testWrite,或直接单击右上角绿色三角形运行。
- 通过 IDEA 的控制台来查看项目的日志信息和输出结果。
people 表中的数据:
People [name=zhangsan, age=27]
People [name=lisi, age=35]
Batch Job execution completed. - 在项目结构中找到 src > test > java 中找到
常见问题
1. 连接超时
如果遇到连接超时问题,可以在 JDBC URL 中配置连接超时参数:
jdbc:mysql://host:port/database?connectTimeout=30000&socketTimeout=60000
2. 字符集问题
为确保正确的字符编码,在 JDBC URL 中设置正确的字符集参数:
jdbc:mysql://host:port/database?characterEncoding=utf8&useUnicode=true
3. SSL 连接
要启用与 seekdb 的 SSL 连接,在 JDBC URL 中添加以下参数:
jdbc:mysql://host:port/database?useSSL=true&requireSSL=true
4. 账号密码中的特殊字符
如果用户名或密码包含特殊字符(如 #),需要进行 URL 编码:
String encodedPassword = URLEncoder.encode(password, "UTF-8");
使用 MySQL Connector/J 8.x 时,确保账号密码不包含井号(#)。否则,可能会遇到连接错误。
项目代码介绍
点击 java-oceanbase-springbatch 下载项目代码,这是一个名为 java-oceanbase-springbatch 的压缩包。
解压后,得到一个名为 java-oceanbase-springbatch 的文件夹。目录结构如下所示:
│ pom.xml
│
├─.idea
│
├─src
│ ├─main
│ │ ├─java
│ │ │ └─com
│ │ │ └─oceanbase
│ │ │ └─example
│ │ │ └─batch
│ │ │ │──BatchApplication.java
│ │ │ │
│ │ │ ├─config
│ │ │ │ └─BatchConfig.java
│ │ │ │
│ │ │ ├─model
│ │ │ │ ├─People.java
│ │ │ │ └─PeopleDESC.java
│ │ │ │
│ │ │ ├─processor
│ │ │ │ └─AddPeopleDescProcessor.java
│ │ │ │
│ │ │ └─writer
│ │ │ ├─AddDescPeopleWriter.java
│ │ │ └─AddPeopleWriter.java
│ │ │
│ │ └─resources
│ │ └─application.properties
│ │
│ └─test
│ └─java
│ └─com
│ └─oceanbase
│ └─example
│ └─batch
│ ├─config
│ │ └─BatchConfigTest.java
│ │
│ ├─processor
│ │ └─AddPeopleDescProcessorTest.java
│ │
│ └─writer
│ ├─AddDescPeopleWriterTest.java
│ └─AddPeopleWriterTest.java
│
└─target
文件说明:
pom.xml:Maven 项目的配置文件,包含了项目的依赖、插件、构建等信息。.idea:IDE(集成开发环境)中使用的目录,用于存储项目相关的配置信息。src:通常用于表示项目中存放源代码的目录。main: 存放主要的源代码和资源文件的目录。java: 存放 Java 源代码的目录。com.oceanbase.example.batch:包名。BatchApplication.java:这是应用程序的入口类,包含了应用程序的主方法。config:这是配置类文件夹,包含了应用程序的配置类。BatchConfig.java:这是应用程序的配置类,用于配置应用程序的一些属性和行为。model:这是模型类文件夹,包含了应用程序的数据模型类。People.java:这是一个人员数据模型类。PeopleDESC.java:这是一个人员DESC数据模型类。processor:这是处理器类文件夹,包含了应用程序的处理器类。AddPeopleDescProcessor.java:这是一个添加人员DESC信息的处理器类。writer:这是写入器类文件夹,包含了应用程序的写入器类。AddDescPeopleWriter.java:这是一个写入人员DESC信息的写入器类。AddPeopleWriter.java:这是一个写入人员信息的写入器类。resources:这是资源文件夹,包含了应用程序的配置文件和其他静态资源文件。application.properties:这是应用程序的配置文件,用于配置应用程序的属性。test: 存放测试代码和资源文件的目录。BatchConfigTest.java:这是应用程序配置类的测试类。AddPeopleDescProcessorTest.java:这是添加人员DESC处理器的测试类。AddDescPeopleWriterTest.java:这是写入人员DESC信息的写入器的测试类。AddPeopleWriterTest.java:这是写入人员信息的写入器的测试类。target: 存放编译后的 Class 文件、Jar 包等文件的目录。
pom.xml 代码介绍
如果您只是想验证示例,那么请使用默认代码,无需修改。您也可以按照以下讲解,根据自己的需求修改 pom.xml 文件。
pom.xml 配置文件内容如下:
-
文件声明语句。
声明本文件是一个 XML 文件,使用的 XML 版本是
1.0,字符编码方式是UTF-8。代码如下:
<?xml version="1.0" encoding="UTF-8"?> -
配置 POM 的命名空间和 POM 模型版本。
- 通过
xmlns指定 POM 的命名空间为http://maven.apache.org/POM/4.0.0。 - 通过
xmlns:xsi指定 XML 命名空间为http://www.w3.org/2001/XMLSchema-instance。 - 通过
xsi:schemaLocation指定 POM 的命名空间为http://maven.apache.org/POM/4.0.0,并指定 POM 的 XSD 文件的位置为https://maven.apache.org/xsd/maven-4.0.0.xsd。 - 通过
<modelVersion>元素指定该 POM 文件使用的 POM 模型版本为4.0.0。
代码如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
</project> - 通过
-
配置父项信息。
- 通过
<groupId>指定父项标识为org.springframework.boot。 - 通过
<artifactId>指定父项依赖为spring-boot-starter-parent。 - 通过
<version>指定父项的版本号为2.7.11。 - 通过
relativePath表示父项的路径为空。
代码如下:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.11</version>
<relativePath/>
</parent> - 通过
-
配置基本信息。
- 通过
<groupId>指定项目标识为com.oceanbase。 - 通过
<artifactId>指定项目依赖为java-oceanbase-springboot。 - 通过
<version>指定项目的版本号为0.0.1-SNAPSHOT。 - 通过
description介绍项目信息为Demo project for Spring Batch。
代码如下:
<groupId>com.oceanbase</groupId>
<artifactId>java-oceanbase-springboot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>java-oceanbase-springbatch</name>
<description>Demo project for Spring Batch</description> - 通过
-
配置 java 版本。
指定项目使用的 Java 版本为 1.8。
代码如下:
<properties>
<java.version>1.8</java.version>
</properties> -
配置核心依赖。
-
指定依赖项所属的组织为
org.springframework.boot,名称为spring-boot-starter,通过该依赖可以使用 Spring Boot 默认支持的组件依赖,支持 Web、数据处理、安全、Test 等功能。代码如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency> -
指定依赖项所属的组织为
org.springframework.boot,名称为spring-boot-starter-jdbc,通过该依赖可以使用 Spring Boot 提供的 JDBC 相关功能,如连接池、数据源配置等。代码如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency> -
指定依赖项所属的组织为
org.springframework.boot,名称为spring-boot-starter-test,作用范围为test,通过该依赖可以使用 Spring Boot 提供的测试框架和工具,如 JUnit、Mockito、Hamcrest 等。代码如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency> -
指定依赖项所属的组织为
com.oceanbase,名称为oceanbase-client,版本号为2.4.12,通过该依赖可以使用 seekdb 提供的客户端功能,如连接、查询、事务等。代码如下:
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>oceanbase-client</artifactId>
<version>2.4.12</version>
</dependency> -
指定依赖项所属的组织为
org.springframework.boot,名称为spring-boot-starter-batch,通过该依赖可以使用 Spring Boot 提供的批处理功能。代码如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency> -
指定依赖项所属的组织为
org.springframework.boot,名称为spring-boot-starter-data-jpa,通过该依赖可以使用 JPA 进行数据访问的必要依赖和配置,Spring Boot Starter Data JPA 是一个 Spring Boot 的启动器。代码如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency> -
指定依赖项所属的组织为
org.apache.tomcat,名称为tomcat-jdbc,通过该依赖可以使用 Tomcat 提供的 JDBC 连接池功能,包括连接池的配置、连接的获取和释放、连接的管理等。代码如下:
<dependency>
<groupId>org.apache.tomcat</groupId>
<artifactId>tomcat-jdbc</artifactId>
</dependency> -
指定依赖项所属测试架构为
junit,名称为junit,版本号为4.10,作用范围为test,通过该依赖可以用于添加 JUnit 单元测试依赖项的配置。代码如下:
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency> -
指定依赖项所属的组织为
javax.activation,名称为javax.activation-api,版本号为1.2.0,通过该依赖可以引入 Java Activation Framework(JAF)库。代码如下:
<dependency>
<groupId>javax.activation</groupId>
<artifactId>javax.activation-api</artifactId>
<version>1.2.0</version>
</dependency> -
指定依赖项所属的组织为
jakarta.persistence,名称为jakarta.persistence-api,版本号为2.2.3,通过该依赖可以添加 Jakarta Persistence API 依赖项的配置。 代码如下:<dependency>
<groupId>jakarta.persistence</groupId>
<artifactId>jakarta.persistence-api</artifactId>
<version>2.2.3</version>
</dependency>
-
-
配置 Maven 插件。
指定依赖项所属的组织为
org.springframework.boot,名称为spring-boot-maven-plugin,该插件用于将 Spring Boot 应用程序打包成可执行的 JAR 包或 WAR 包,并且可以直接运行。代码如下:
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
application.properties 文件介绍
application.properties 文件用于配置数据库连接和其他相关的配置项。如数据库驱动程序、连接 URL、用户名和密码等。还包括了一些关于 JPA(Java Persistence API)和 Spring Batch 的配置,以及日志级别的设置。
-
数据库连接配置。
- 通过
spring.datasource.driver指定数据库驱动程序为com.mysql.cj.jdbc.Driver,用于与 seekdb 建立连接。 - 通过
spring.datasource.url指定连接数据库的 URL。 - 通过
spring.datasource.username指定连接数据库的用户名。 - 通过
spring.datasource.password指定连接数据库的密码。
代码如下:
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:oceanbase://host:port/schema_name?characterEncoding=utf-8
spring.datasource.username=user_name
spring.datasource.password=****** - 通过
-
JPA 配置。
- 通过
spring.jpa.show-sql指定是否在日志中显示 SQL 语句,设置为true,表示显示 SQL 语句。 - 通过
spring.jpa.hibernate.ddl-auto指定 Hibernate 的 DDL 操作行为,这里设置为update,表示在应用程序启动时自动更新数据库结构。
代码如下:
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update - 通过
-
Spring Batch 配置:
通过
spring.batch.job.enabled指定是否启用 Spring Batch 作业,这里设置为false,表示禁用自动执行批处理作业。代码如下:
spring.batch.job.enabled=false信息在 Spring Batch 中,
spring.batch.job.enabled属性的作用是控制批处理作业的执行行为。spring.batch.job.enabled=true(默认值):表示在 Spring Boot 应用启动时,自动运行所有定义的批处理作业。这意味着在应用启动时,Spring Batch 会自动 发现并执行所有定义的作业。spring.batch.job.enabled=false:表示禁用自动执行批处理作业。这通常用于开发或测试环境,或者当你希望手动控制作业执行时。设为false后,作业不会在应用启动时自动执行,您可以通过其他方式(如 REST 接口、命令行等)手动触发作业。
总结来说,设置
spring.batch.job.enabled=false可以帮助您避免在应用启动时自动运行作业,提供更大的灵活性来控制何时执行批处理作业。 -
日志配置:
- 通过
logging.level.org.springframework指定 Spring 框架的日志级别为INFO。 - 通过
logging.level.com.example指定应用程序自定义代码的日志级别为DEBUG。
代码如下:
logging.level.org.springframework=INFO
logging.level.com.example=DEBUG - 通过
BatchApplication.java 文件介绍
BatchApplication.java 文件是 Spring Boot 应用程序的入口文件。
BatchApplication.java 文件的代码主要包括以下几个部分:
-
引用其他类和接口。
声明当前文件包 含以下接口和类:
SpringApplication类:用于启动 Spring Boot 应用程序。SpringBootApplication注解:用于标记该类为 Spring Boot 应用程序的入口。
代码如下:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; -
定义
BatchApplication类。使用
@SpringBootApplication注解标记BatchApplication类为 Spring Boot 应用程序的入口。在BatchApplication类定义了一个静态的main方法作为应用程序的入口点。在该方法中,使用SpringApplication.run方法启动 Spring Boot 应用程序。再定义了一个名为runBatchJob的方法,用于运行批处理作业。代码如下:
@SpringBootApplication
public class BatchApplication {
public static void main(String[] args) {
SpringApplication.run(BatchApplication.class, args);
}
public void runBatchJob() {
}
}
BatchConfig.java 文件介绍
BatchConfig.java 文件用于配置批处理作业的步骤、读取器、处理器和写入器等组件。
BatchConfig.java 文件的代码主要包括以下几个部分:
-
引用其他类和接口。
声明当前文件包含以下接口和类:
People类:用于存储从数据库中读取的人员信息。PeopleDESC类:用于存储对人员信息进行转换或处理后的描述信息。AddPeopleDescProcessor类:将读取到的People对象转换为PeopleDESC对象,ItemProcessor接口的实现类。AddDescPeopleWriter类:将PeopleDESC对象写入到目标位置,ItemWriter接口的实现类。Job接口:表示一个批处理作业。Step接口:表示作业中的一个步骤。EnableBatchProcessing注解:Spring Batch 的配置注解,用于启用和配置 Spring Batch 处理功能。JobBuilderFactory类:用于创建和配置作业。StepBuilderFactory类:用于创建和配置步骤。RunIdIncrementer类:Spring Batch 的运行 ID(Run ID)自增器,用于在每次运行作业时增加运行的 ID。ItemProcessor接口:用于对读取到的项进行处理或转换。ItemReader接口:用于从数据源中读取项。ItemWriter接口:用于将处理或转换后的项写入到指定的目标位置。JdbcCursorItemReader类:用于从数据库中读取数据并返回游标结果集。Autowired注解:用于进行依赖注入。Bean注解:用于创建和配置 Bean。ComponentScan注解:用于指定要进行组件扫描的包或类。Configuration注解:用于将类标记为配置类。EnableAutoConfiguration注解:用于启用 Spring Boot 的自动配置。SpringBootApplication注解:用于标记该类为 Spring Boot 应用程序的入口。DataSource接口:用于表示数据库的连接。
代码如下:
import com.oceanbase.example.batch.model.People;
import com.oceanbase.example.batch.model.PeopleDESC;
import com.oceanbase.example.batch.processor.AddPeopleDescProcessor;
import com.oceanbase.example.batch.writer.AddDescPeopleWriter;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import javax.sql.DataSource; -
定义
BatchConfig类。这是一个简单的 Spring Batch 批处理作业。它定义了数据的读取、处理和写入方式,并将这些步骤封装为一个作业。使用 Spring Batch 的注解和自动配置功能,通过配置类中的各个
@Bean方法,可以创建相应的组件实例,并在step1中使用这些组件完成数据的读取、处理和写入。- 通过
@Configuration表示这个类是一个配置类。 - 通过
@EnableBatchProcessing启用 Spring Batch 的处理功能,该注解会自动创建必要的bean,如JobRepository、JobLauncher等。 - 通过
@SpringBootApplication用于 Spring Boot 应用的主类注解,是 Spring Boot 应用的启动点。 - 通过
@ComponentScan指定要进行组件扫描的包,告诉 Spring 在这个包和子包中扫描和注册所有的组件。 - 通过
@EnableAutoConfiguration自动配置 Spring Boot 应用程序的基础设施。
代码如下:
@Configuration
@EnableBatchProcessing
@SpringBootApplication
@ComponentScan("com.oceanbase.example.batch.writer")
@EnableAutoConfiguration
public class BatchConfig {
}-
定义
@Autowired注解。通过
@Autowired注解将JobBuilderFactory、StepBuilderFactory和DataSource注入到BatchConfig类中的成员变量中。JobBuilderFactory用于创建和配置作业(Job)的工厂类,StepBuilderFactory用于创建和配置步骤(Step)的工厂类,DataSource用于获取数据库连接的接口。代码如下:
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource; -
定义
@Bean注解。使用
@Bean注解定义了几个方法,用于创建批处理作业的读取器、处理器、写入器、步骤和作业。-
通过
peopleReader方法创建一个ItemReader组件实例,该组件使用JdbcCursorItemReader来读取数据库中的People对象数据。设置数据源dataSource、设置RowMapper来将数据库行映射为People对象,设置 SQL 查询语句为SELECT * FROM people。 -
通过
addPeopleDescProcessor方法创建一个ItemProcessor组件实例,该组件使用AddPeopleDescProcessor来处理People对象,返回转换为PeopleDESC对象。 -
通过
addDescPeopleWriter方法创建一个ItemWriter组件实例,该组件使用AddDescPeopleWriter来将PeopleDESC对象写入到目标位置。 -
通过
step1方法创建一个Step组件实例,该步骤的名称为step1,通过stepBuilderFactory.get获取步骤构建器,设置读取器为ItemReader组件,设置处理器为ItemProcessor组件,设置写入器为ItemWriter组件,设置chunk大小为10,最后调用build构建并返回配置完成的Step。 -
通过
importJob方法创建一个Job组件实例,该作业的名称为importJob,通过jobBuilderFactory.get获取作业构建器,设置增量器为RunIdIncrementer,设定作业flow的最初步骤为Step,最后调用build构建并返回配置完成的Job。代码如下:
@Bean
public ItemReader<People> peopleReader() {
JdbcCursorItemReader<People> reader = new JdbcCursorItemReader<>();
reader.setDataSource((javax.sql.DataSource) dataSource);
reader.setRowMapper(new BeanPropertyRowMapper<>(People.class));
reader.setSql("SELECT * FROM people");
return reader;
}
@Bean
public ItemProcessor<People, PeopleDESC> addPeopleDescProcessor() {
return new AddPeopleDescProcessor();
}
@Bean
public ItemWriter<PeopleDESC> addDescPeopleWriter() {
return new AddDescPeopleWriter();
}
@Bean
public Step step1(ItemReader<People> reader, ItemProcessor<People, PeopleDESC> processor,
ItemWriter<PeopleDESC> writer) {
return stepBuilderFactory.get("step1")
.<People, PeopleDESC>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public Job importJob(Step step1) {
return jobBuilderFactory.get("importJob")
.incrementer(new RunIdIncrementer())
.flow(step1)
.end()
.build();
}
-
- 通过
People.java 文件介绍
People.java 文件是创建一个 People 类的数据模型,表示一个人的信息。该类包含了两个私有成员变量 name 和 age,以及相应的 getter 和 setter 方法。最后重写 toString 方法用于打印对象的信息。其中 name 表示人的姓名,age 表示人的年龄。通过 getter 和 setter 方法可以获取和设置这些属性的值。
该类的作用是为批处理程序的输入和输出提供一种存储和传递数据的方式。在批处理的读取和写入操作中,使用 People 对象来存储数据,通过 setter 方法设置数据,通过 getter 方法获取数据。
代码如下:
public class People {
private String name;
private int age;
// getters and setters
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "People [name=" + name + ", age=" + age + "]";
}
// Getters and setters
}
PeopleDESC.java 文件介绍
PeopleDESC.java 文件是创建一个 PeopleDESC 类的数据模型,用于表示人员的信息,PeopleDESC 类有四个属性:name、age、desc 和 id,分别表示人的姓名、年龄、描述和标识。该类包含了相应的 getter 和 setter 方法来访问和设置属性的值。重写 toString 方法用于返回类的字符串表示形式,包含姓名、年龄和描述。
与 People 类类似,PeopleDESC 类也用于在批处理程序的输入和输出中存储和传递数据。
代码如下:
public class PeopleDESC {
private String name;
private int age;
private String desc;
private int id;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
@Override
public String toString() {
return "PeopleDESC [name=" + name + ", age=" + age + ", desc=" + desc + "]";
}
}
AddPeopleDescProcessor.java 文件介绍
AddPeopleDescProcessor.java 文件定义一个名为 AddPeopleDescProcessor 的类,实现 ItemProcessor 接口,用于将 People 对象转换为 PeopleDESC 对象。
AddPeopleDescProcessor.java 文件的代码主要包括以下几个部分:
-
引用其他类和接口。
声明当前文件包含以下接口和类:
People类:用于存储从数据库中读取的人员信息。PeopleDESC类:用于存储对人员信息进行转换或处理后的描述信息。ItemProcessor接口:用于对读取到的项进行处理或转换。
代码如下:
import com.oceanbase.example.batch.model.People;
import com.oceanbase.example.batch.model.PeopleDESC;
import org.springframework.batch.item.ItemProcessor; -
定义
AddPeopleDescProcessor类。ItemProcessor接口的AddPeopleDescProcessor类用于将People对象转换为PeopleDESC对象,实现在批处理过程中对输入的数据的处理逻辑。在该类的
process方法中,首先创建一个PeopleDESC对象desc,然后通过item参数获取People对象的属性(name和age),将这些属性设置到desc对象中。同时,也为desc对象的desc属性赋值,赋值的逻辑为根据People对象的属性生成一段描述信息。最后,返回通过处理后的PeopleDESC对象。代码如下:
public class AddPeopleDescProcessor implements ItemProcessor<People, PeopleDESC> {
@Override
public PeopleDESC process(People item) throws Exception {
PeopleDESC desc = new PeopleDESC();
desc.setName(item.getName());
desc.setAge(item.getAge());
desc.setDesc("This is " + item.getName() + " with age " + item.getAge());
return desc;
}
}
AddDescPeopleWriter.java 文件介绍
AddDescPeopleWriter.java 文件实现了 ItemWriter 接口的 AddDescPeopleWriter 类,用于将 People 对象写入到数据库。
AddDescPeopleWriter.java 文件的代码主要包括以下几个部分:
-
引用其他类和接口。
声明当前文件包含以下接口和类:
PeopleDESC类:用于存储对人员信息进行转换或处理后的描述信息。ItemWriter接口:用于将处理或转换后的项写入到指定的目标位置。Autowired注解:用于进行依赖注入。JdbcTemplate类:提供执行 SQL 语句的方法。List接口:用于操作查询结果集合。
代码如下:
import com.oceanbase.example.batch.model.PeopleDESC;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import java.util.List; -
定义
AddDescPeopleWriter类。-
使用
@Autowired注解将JdbcTemplate实例自动注入进来,在写入数据时使用该实例执行数据库操作。代码如下:
@Autowired
private JdbcTemplate jdbcTemplate; -
在
write方法中,对传入的List<? extends PeopleDESC>进行遍历,依次取出每个PeopleDESC对象。首先,执行 SQL 语句DROP TABLE people_desc,用于删除可能已存在的名为people_desc的表。然后,执行 SQL 语句CREATE TABLE people_desc (id INT PRIMARY KEY, name VARCHAR2(255), age INT, description VARCHAR2(255)),用于创建一个名为people_desc的表,表中包含了id、name、age和description四个列。接下来,使用 SQL 语句INSERT INTO people_desc (id, name, age, description) VALUES (?, ?, ?, ?),将每个PeopleDESC对象的属性值分别插入到people_desc表中。代码如下:
@Override
public void write(List<? extends PeopleDESC> items) throws Exception {
// 先删除可能存在的表
jdbcTemplate.execute("DROP TABLE people_desc");
// 建表语句
String createTableSql = "CREATE TABLE people_desc (id INT PRIMARY KEY, name VARCHAR2(255), age INT, description VARCHAR2(255))";
jdbcTemplate.execute(createTableSql);
for (PeopleDESC item : items) {
String sql = "INSERT INTO people_desc (id, name, age, description) VALUES (?, ?, ?, ?)";
jdbcTemplate.update(sql, item.getId(), item.getName(), item.getAge(), item.getDesc());
}
}
-
AddPeopleWriter.java 文件介绍
AddPeopleWriter.java 文件实现了 ItemWriter 接口的 AddDescPeopleWriter 类,用于将 PeopleDESC 对象写入到数据库。
AddPeopleWriter.java 文件的代码主要包括以下几个部分:
-
引用其他类和接口。
声明当前文件包含以下接口和类:
People类:用于存储从数据库中读取的人员信息。ItemWriter接口:用于将处理或转换后的项写入到指定的目标位置。Autowired注解:用于进行依赖注入。JdbcTemplate类:提供执行 SQL 语句的方法。Component注解:用于将该类标记为 Spring 组件。List接口:用于操作查询结果集合。
代码如下:
import com.oceanbase.example.batch.model.People;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import java.util.List; -
定义
AddPeopleWriter类。-
使用
@Autowired注解将JdbcTemplate实例自动注入进来,在写入数据时使用该实例执行数据库操作。代码如下:
@Autowired
private JdbcTemplate jdbcTemplate; -
在
write方法中,对传入的List<? extends People>进行遍历,依次取出每个People对象。首先,执行 SQL 语句DROP TABLE people,用于删除可能已存在的名为people的表。然后,执行 SQL 语句CREATE TABLE people (name VARCHAR2(255), age INT),用于创建一个名为people的表,表中包含了name和age两个列。接下来,使用 SQL 语句INSERT INTO people (name, age) VALUES (?, ?),将每个People对象的属性值分别插入到people表中。代码如下:
@Override
public void write(List<? extends People> items) throws Exception {
// 先删除可能存在的表
jdbcTemplate.execute("DROP TABLE people");
// 建表语句
String createTableSql = "CREATE TABLE people (name VARCHAR2(255), age INT)";
jdbcTemplate.execute(createTableSql);
for (People item : items) {
String sql = "INSERT INTO people (name, age) VALUES (?, ?)";
jdbcTemplate.update(sql, item.getName(), item.getAge());
}
}
-
BatchConfigTest.java 文件介绍
BatchConfigTest.java 文件是一个使用 JUnit 进行测试的类,用于测试 Spring Batch 的作业配置。
BatchConfigTest.java 文件的代码主要包括以下几个部分:
-
引用其他类和接口。
声明当前文件包含以下接口和类:
Assert类:用于断言测试结果。Test注解:用于标记测试方法。RunWith注解:用于指定测试运行器。Job接口:表示一个批处理作业。JobExecution类:用于表示批处理作业的执行。JobParameters类:用于表示批处理作业的参数。JobParametersBuilder类:用于构建批处理作业的参数。JobLauncher接口:用于启动批处理作业。Autowired注解:用于进行依赖注入。SpringBootTest注解:用于指定测试类为 Spring Boot 测试。SpringRunner类:用于指定测试运行器为 SpringRunner。
代码如下:
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.batch.runtime.BatchStatus;
import java.util.UUID; -
定义
BatchConfigTest类。通过使用
SpringBootTest注解和SpringRunner运行器,可以进行 Spring Boot 的集成测试。在testJob方法中,使用JobLauncherTestUtils辅助类启动批处理作业,并使用断言来验证作业的执行状态。-
使用
@Autowired注解自动注入JobLauncherTestUtils实例。代码如下:
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils; -
使用
@Test注解标记testJob方法为测试方法。在该方法中,首先创建JobParameters对象,然后使用jobLauncherTestUtils.launchJob方法启动批处理作业,并使用Assert.assertEquals方法断言作业的执行状态为COMPLETED。代码如下:
@Test
public void testJob() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("jobParam", "paramValue")
.toJobParameters();
JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters);
Assert.assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus());
} -
使用
@Autowired注解自动注入JobLauncher实例。代码如下:
@Autowired
private JobLauncher jobLauncher; -
使用
@Autowired注解自动注入Job实例。代码如下:
@Autowired
private Job job; -
定义了一个名为
JobLauncherTestUtils的内部类,用于辅助启动批处理作业。在该类中定义了一个launchJob方法,用于启动批处理作业。在该方法中,使用jobLauncher.run方法启动作业,并返回作业的执行结果。代码如下:
private class JobLauncherTestUtils {
public JobExecution launchJob(JobParameters jobParameters) throws Exception {
return jobLauncher.run(job, jobParameters);
}
}
-
AddPeopleDescProcessorTest.java 文件介绍
AddPeopleDescProcessorTest.java 文件是一个使用 JUnit 进行测试的类,用于测试 Spring Batch 的作业配置。
AddPeopleDescProcessorTest.java 文件的代码主要包括以下几个部分:
-
引用其他类和接口。
声明当前文件包含以下接口和类:
People类:用于存储从数据库中读取的人员信息。PeopleDESC类:用于存储对人员信息进行转换或处理后的描述信息。Assert类:用于验证测试中的预期结果和实际结果是否一致。Test注解:用于标记测试方法。RunWith注解:用于指定测试运行器。Autowired注解:用于进行依赖注入。SpringBootTest注解:用于指定测试类为 Spring Boot 测试。SpringRunner类:用于指定测试运行器为 SpringRunner。
代码如下:
import com.oceanbase.example.batch.model.People;
import com.oceanbase.example.batch.model.PeopleDESC;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner; -
定义
AddPeopleDescProcessorTest类。通过使用
SpringBootTest注解和SpringRunner运行器,进行 Spring Boot 的集成测试。-
使用
@Autowired注解自动注入AddPeopleDescProcessor实例。代码如下:
@Autowired
private AddPeopleDescProcessor processor; -
使用
@Test注解标记testProcess方法为测试方法。在该方法中,首先创建一个People对象,然后使用processor.process方法处理该对象,并将结果赋值给一个PeopleDESC对象。代码如下:
@Test
public void testProcess() throws Exception {
People people = new People();
people.setName("John");
people.setAge(25);
PeopleDESC desc = processor.process(people);
}
-
AddDescPeopleWriterTest.java 文件介绍
AddDescPeopleWriterTest.java 文件是一个使用 JUnit 进行测试的类,用于测试 AddDescPeopleWriter 的写入逻辑。
AddDescPeopleWriterTest.java 文件的代码主要包括以下几个部分:
-
引用其他类和接口。
声明当前文件包含以下接口和类:
PeopleDESC类:用于存储对人员信息进行转换或处理后的描述信息。Assert类:用于断言测试结果。Test注解:用于标记测试方法。RunWith注解:用于指定测试运行器。Autowired注解:用于进行依赖注入。SpringBootTest注解:用于指定测试类为 Spring Boot 测试。JdbcTemplate类:提供执行 SQL 语句的方法。SpringRunner类:用于指定测试运行器为 SpringRunner。ArrayList类,用于创建一个空的列表。List接口:用于操作查询结果集合。
代码如下:
import com.oceanbase.example.batch.model.PeopleDESC;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.ArrayList;
import java.util.List; -
定义
AddDescPeopleWriterTest类。通过使用
SpringBootTest注解和SpringRunner运行器,进行 Spring Boot 的集成测试。-
使用
@Autowired注入实例。 使用@Autowired注解自动注入AddPeopleDescProcessor和JdbcTemplate实例。代码如下:
@Autowired
private AddDescPeopleWriter writer;
@Autowired
private JdbcTemplate jdbcTemplate; -
使用
@Test测试数据的插入和输出。 使用@Test注解标记testWrite方法为测试方法。在该方法中,首先创建一个空的peopleDescList列表,并向列表中添加两个PeopleDESC对象。然后使用writer.write方法将列表中的数据写入到数据库中。 接着使用jdbcTemplate执行查询语句,获取people_desc表中的数据,并使用断言语句验证数据的正确性。最后,将查询结果输出到控制台,并输出作业执行完成的信息。-
插入数据到
people_desc表。 首先创建了一个空的PeopleDESC对象列表peopleDescList。其次创建了两个PeopleDESC对象desc1和desc2,并分别设置了它们的属性值。将desc1和desc2添加到peopleDescList列表中。接下来调用了writer的write方法,将peopleDescList中的对象写入数据库中的people_desc表。然后使用JdbcTemplate执行了一条查询语句SELECT COUNT(*) FROM people_desc,获取了people_desc表中的记录数,并将结果赋值给变量count。最后使用Assert.assertEquals方法进行断言,验证count的值是否等于2。代码如下:
List<PeopleDESC> peopleDescList = new ArrayList<>();
PeopleDESC desc1 = new PeopleDESC();
desc1.setId(1);
desc1.setName("John");
desc1.setAge(25);
desc1.setDesc("This is John with age 25");
peopleDescList.add(desc1);
PeopleDESC desc2 = new PeopleDESC();
desc2.setId(2);
desc2.setName("Alice");
desc2.setAge(30);
desc2.setDesc("This is Alice with age 30");
peopleDescList.add(desc2);
writer.write(peopleDescList);
String selectSql = "SELECT COUNT(*) FROM people_desc";
int count = jdbcTemplate.queryForObject(selectSql, Integer.class);
Assert.assertEquals(2, count); -
输出
people_desc表中的数据。 首先使用JdbcTemplate执行了一条查询语句SELECT * FROM people_desc,并使用lambda表达式处理查询结果。在lambda表达式中,使用rs.getInt、rs.getString等方法获取查询结果集中的字段值,并将字段值设置到一个新创建的PeopleDESC对象中。将每个新创建的PeopleDESC对象添加到一个结果列表resultDesc中。之后打印了一行提示信息people_desc 表中的数据:,然后使用for循环遍历resultDesc列表中的每个PeopleDESC对象,并使用System.out.println打印输出每个对象的内容。最后打印了一条作业执行完成的信息。代码如下:
List<PeopleDESC> resultDesc = jdbcTemplate.query("SELECT * FROM people_desc", (rs, rowNum) -> {
PeopleDESC desc = new PeopleDESC();
desc.setId(rs.getInt("id"));
desc.setName(rs.getString("name"));
desc.setAge(rs.getInt("age"));
desc.setDesc(rs.getString("description"));
return desc;
});
System.out.println("people_desc 表中的数据:");
for (PeopleDESC desc : resultDesc) {
System.out.println(desc);
}
// 输出作业执行完成后的信息
System.out.println("Batch Job execution completed.");
-
-
AddPeopleWriterTest.java 文件介绍
AddPeopleWriterTest.java 文件是一个使用 JUnit 进行测试的类,用于测试 AddPeopleWriterTest 的写入逻辑。
AddPeopleWriterTest.java 文件的代码主要包括以下几个部分:
-
引用其他类和接口。
声明当前文件包含以下接口和类:
People类:用于存储从数据库中读取的人员信息。Test注解:用于标记测试方法。RunWith注解:用于指定测试运行器。Autowired注解:用于进行依赖注入。SpringBootApplication注解:用于标记该类为 Spring Boot 应用程序的入口。SpringBootTest注解:用于指定测试类为 Spring Boot 测试。ComponentScan注解:用于指定要进行组件扫描的包或类。JdbcTemplate类:提供执行 SQL 语句的方法。SpringRunner类:用于指定测试运行器为SpringRunner。ArrayList类,用于创建一个空的列表。List接口:用于操作查询结果集合。
代码如下:
import com.oceanbase.example.batch.model.People;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.ArrayList;
import java.util.List; -
定义
AddPeopleWriterTest类。通过使用
SpringBootTest注解和SpringRunner运行器,进行 Spring Boot 的集成测试,并使用@ComponentScan注解指定要扫描的包路径。-
使用
@Autowired注入实例。 使用@Autowired注解自动注入addPeopleWriter和JdbcTemplate实例。代码如下:
@Autowired
private AddPeopleWriter addPeopleWriter;
@Autowired
private JdbcTemplate jdbcTemplate; -
使用
@Test测试数据的插入和输出。-
插入数据到
people表。 首先,创建一个空的People对象列表peopleList。然后,创建两个People对象person1和person2,并设置它们的名称和年龄属性。接着,将这两个People对象添加到peopleList列表中。之后,调用addPeopleWriter的write方法,将peopleList作为参数传递给该方法,用于将这些People对象写入数据库。代码如下:
List<People> peopleList = new ArrayList<>();
People person1 = new People();
person1.setName("zhangsan");
person1.setAge(27);
peopleList.add(person1);
People person2 = new People();
person2.setName("lisi");
person2.setAge(35);
peopleList.add(person2);
addPeopleWriter.write(peopleList); -
输出
people表中的数据。 首先使用JdbcTemplate执行了一条查询语句SELECT * FROM people,并使用lambda表达式处理查询结果。在lambda表达式中,使用rs.getString和rs.getInt方法获取查询结果集中的字段值,并将字段值设置到一个新创建的People对象中。将每个新创建的People对象添加到一个结果列表result中。接下来打印了一行提示信息people 表中的数据:然后使用for循环遍历result列表中的每个People对象,并使用System.out.println打印输出每个对象的内容。最后打印了一条作业执行完成的信息。代码如下:
List<People> result = jdbcTemplate.query("SELECT * FROM people", (rs, rowNum) -> {
People person = new People();
person.setName(rs.getString("name"));
person.setAge(rs.getInt("age"));
return person;
});
System.out.println("people 表中的数据:");
for (People person : result) {
System.out.println(person);
}
// 输出作业执行完成后的信息
System.out.println("Batch Job execution completed.");
-
-
完整的代码展示
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.11</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.oceanbase</groupId>
<artifactId>java-oceanbase-springboot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>java-oceanbase-springbatch</name>
<description>Demo project for Spring Batch</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>oceanbase-client</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.apache.tomcat</groupId>
<artifactId>tomcat-jdbc</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.activation</groupId>
<artifactId>javax.activation-api</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>jakarta.persistence</groupId>
<artifactId>jakarta.persistence-api</artifactId>
<version>2.2.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties
#configuration database
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:oceanbase://host:port/schema_name?characterEncoding=utf-8
spring.datasource.username=user_name
spring.datasource.password=
# JPA
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update
# Spring Batch
spring.batch.job.enabled=false
#
logging.level.org.springframework=INFO
logging.level.com.example=DEBUG
BatchApplication.java
package com.oceanbase.example.batch;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class BatchApplication {
public static void main(String[] args) {
SpringApplication.run(BatchApplication.class, args);
}
public void runBatchJob() {
}
}
BatchConfig.java
package com.oceanbase.example.batch.config;
import com.oceanbase.example.batch.model.People;
import com.oceanbase.example.batch.model.PeopleDESC;
import com.oceanbase.example.batch.processor.AddPeopleDescProcessor;
import com.oceanbase.example.batch.writer.AddDescPeopleWriter;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import javax.sql.DataSource;
//import javax.activation.DataSource;
@Configuration
@EnableBatchProcessing
@SpringBootApplication
@ComponentScan("com.oceanbase.example.batch.writer")
@EnableAutoConfiguration
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;// 使用 Spring Boot 自动配置提供的默认 dataSource
@Bean
public ItemReader<People> peopleReader() {
JdbcCursorItemReader<People> reader = new JdbcCursorItemReader<>();
reader.setDataSource((javax.sql.DataSource) dataSource);
reader.setRowMapper(new BeanPropertyRowMapper<>(People.class));
reader.setSql("SELECT * FROM people");
return reader;
}
@Bean
public ItemProcessor<People, PeopleDESC> addPeopleDescProcessor() {
return new AddPeopleDescProcessor();
}
@Bean
public ItemWriter<PeopleDESC> addDescPeopleWriter() {
return new AddDescPeopleWriter();
}
@Bean
public Step step1(ItemReader<People> reader, ItemProcessor<People, PeopleDESC> processor,
ItemWriter<PeopleDESC> writer) {
return stepBuilderFactory.get("step1")
.<People, PeopleDESC>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public Job importJob(Step step1) {
return jobBuilderFactory.get("importJob")
.incrementer(new RunIdIncrementer())
.flow(step1)
.end()
.build();
}
}
People.java
package com.oceanbase.example.batch.model;
public class People {
private String name;
private int age;
// getters and setters
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "People [name=" + name + ", age=" + age + "]";
}
// Getters and setters
}
PeopleDESC.java
package com.oceanbase.example.batch.model;
public class PeopleDESC {
private String name;
private int age;
private String desc;
private int id;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
@Override
public String toString() {
return "PeopleDESC [name=" + name + ", age=" + age + ", desc=" + desc + "]";
}
}
AddPeopleDescProcessor.java
package com.oceanbase.example.batch.processor;
import com.oceanbase.example.batch.model.People;
import com.oceanbase.example.batch.model.PeopleDESC;
import org.springframework.batch.item.ItemProcessor;
public class AddPeopleDescProcessor implements ItemProcessor<People, PeopleDESC> {
@Override
public PeopleDESC process(People item) throws Exception {
PeopleDESC desc = new PeopleDESC();
desc.setName(item.getName());
desc.setAge(item.getAge());
desc.setDesc("This is " + item.getName() + " with age " + item.getAge());
return desc;
}
}
AddDescPeopleWriter.java
package com.oceanbase.example.batch.writer;
import com.oceanbase.example.batch.model.PeopleDESC;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import java.util.List;
public class AddDescPeopleWriter implements ItemWriter<PeopleDESC> {
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public void write(List<? extends PeopleDESC> items) throws Exception {
// 先删除可能存在的表
jdbcTemplate.execute("DROP TABLE people_desc");
// 建表语句
String createTableSql = "CREATE TABLE people_desc (id INT PRIMARY KEY, name VARCHAR2(255), age INT, description VARCHAR2(255))";
jdbcTemplate.execute(createTableSql);
for (PeopleDESC item : items) {
String sql = "INSERT INTO people_desc (id, name, age, description) VALUES (?, ?, ?, ?)";
jdbcTemplate.update(sql, item.getId(), item.getName(), item.getAge(), item.getDesc());
}
}
}
AddPeopleWriter.java
package com.oceanbase.example.batch.writer;
import com.oceanbase.example.batch.model.People;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class AddPeopleWriter implements ItemWriter<People> {
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public void write(List<? extends People> items) throws Exception {
// 先删除可能存在的表
jdbcTemplate.execute("DROP TABLE people");
// 建表语句
String createTableSql = "CREATE TABLE people (name VARCHAR2(255), age INT)";
jdbcTemplate.execute(createTableSql);
for (People item : items) {
String sql = "INSERT INTO people (name, age) VALUES (?, ?)";
jdbcTemplate.update(sql, item.getName(), item.getAge());
}
}
}
BatchConfigTest.java
package com.oceanbase.example.batch.config;
import com.oceanbase.example.batch.writer.AddDescPeopleWriter;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import javax.batch.runtime.BatchStatus;
import java.util.UUID;
@RunWith(SpringRunner.class)
@SpringBootTest
public class BatchConfigTest {
@Test
public void testJob() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("jobParam", UUID.randomUUID().toString())
.toJobParameters();
JobLauncherTestUtils jobLauncherTestUtils = new JobLauncherTestUtils();
JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters);
Assert.assertEquals(BatchStatus.COMPLETED.toString(), jobExecution.getStatus().toString());
}
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
private class JobLauncherTestUtils {
public JobExecution launchJob(JobParameters jobParameters) throws Exception {
return jobLauncher.run(job, jobParameters);
}
}
}
AddPeopleDescProcessorTest.java
package com.oceanbase.example.batch.processor;
import com.oceanbase.example.batch.model.People;
import com.oceanbase.example.batch.model.PeopleDESC;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class AddPeopleDescProcessorTest {
@Autowired
private AddPeopleDescProcessor processor;
@Test
public void testProcess() throws Exception {
People people = new People();
// people.setName("John");
// people.setAge(25);
PeopleDESC desc = processor.process(people);
// Assert.assertEquals("John", desc.getName());
// Assert.assertEquals(25, desc.getAge());
// Assert.assertEquals("This is John with age 25", desc.getDesc());
}
}
AddDescPeopleWriterTest.java
package com.oceanbase.example.batch.writer;
import com.oceanbase.example.batch.model.PeopleDESC;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.ArrayList;
import java.util.List;
@RunWith(SpringRunner.class)
@SpringBootTest
public class AddDescPeopleWriterTest {
@Autowired
private AddDescPeopleWriter writer;
@Autowired
private JdbcTemplate jdbcTemplate;
@Test
public void testWrite() throws Exception {
// 插入数据到people_desc表
List<PeopleDESC> peopleDescList = new ArrayList<>();
PeopleDESC desc1 = new PeopleDESC();
desc1.setId(1);
desc1.setName("John");
desc1.setAge(25);
desc1.setDesc("This is John with age 25");
peopleDescList.add(desc1);
PeopleDESC desc2 = new PeopleDESC();
desc2.setId(2);
desc2.setName("Alice");
desc2.setAge(30);
desc2.setDesc("This is Alice with age 30");
peopleDescList.add(desc2);
writer.write(peopleDescList);
String selectSql = "SELECT COUNT(*) FROM people_desc";
int count = jdbcTemplate.queryForObject(selectSql, Integer.class);
Assert.assertEquals(2, count);
// 输出people_desc表中的数据
List<PeopleDESC> resultDesc = jdbcTemplate.query("SELECT * FROM people_desc", (rs, rowNum) -> {
PeopleDESC desc = new PeopleDESC();
desc.setId(rs.getInt("id"));
desc.setName(rs.getString("name"));
desc.setAge(rs.getInt("age"));
desc.setDesc(rs.getString("description"));
return desc;
});
System.out.println("people_desc 表中的数据:");
for (PeopleDESC desc : resultDesc) {
System.out.println(desc);
}
// 输出作业执行完成后的信息
System.out.println("Batch Job execution completed.");
}
}
AddPeopleWriterTest.java
package com.oceanbase.example.batch.writer;
import com.oceanbase.example.batch.model.People;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.ArrayList;
import java.util.List;
@RunWith(SpringRunner.class)
@SpringBootTest
@SpringBootApplication
@ComponentScan("com.oceanbase.example.batch.writer")
public class AddPeopleWriterTest {
@Autowired
private AddPeopleWriter addPeopleWriter;
@Autowired
private JdbcTemplate jdbcTemplate;
@Test
public void testWrite() throws Exception {
// 插入数据到people表
List<People> peopleList = new ArrayList<>();
People person1 = new People();
person1.setName("zhangsan");
person1.setAge(27);
peopleList.add(person1);
People person2 = new People();
person2.setName("lisi");
person2.setAge(35);
peopleList.add(person2);
addPeopleWriter.write(peopleList);
// 查询并输出结果
List<People> result = jdbcTemplate.query("SELECT * FROM people", (rs, rowNum) -> {
People person = new People();
person.setName(rs.getString("name"));
person.setAge(rs.getInt("age"));
return person;
});
System.out.println("people 表中的数据:");
for (People person : result) {
System.out.println(person);
}
// 输出作业执行完成后的信息
System.out.println("Batch Job execution completed.");
}
}
相关文档
更多 OceanBase Connector/J 的信息,请参见 OceanBase JDBC 驱动程序