In this article, we will explore, setting up a simple Spring Batch code that reads data from Elasticsearch, extracts a field and creates a text file out of it. This is often a common use-case, I have seen.
If you do not have a Spring project, you can go ahead and download from https://start.spring.io/.
Lets add Spring Batch related dependencies:
<!-- Spring Batch dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
Lets add Elasticsearch related dependencies:
<!-- Elasticsearch dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
Update application.properties
to have the properties to connect to elastic-search
spring.data.elasticsearch.cluster-name=myCluster
spring.data.elasticsearch.cluster-nodes=localhost:9200
Lets create a Java class that resembles a record in ES:
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
@Document
public class CustomerActivity {
@Id
private String id;
private String wb;
// Getters and setters
}
Note: Do not put indexName here, since this example expects IndexName to be dynamically picked as part of the batch.
Lets create class to read from Elastic-search:
import org.springframework.batch.item.ItemReader;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.SearchQuery;
public class ElasticsearchItemReader implements ItemReader<CustomerActivity> {
private final ElasticsearchTemplate elasticsearchTemplate;
private final String indexName;
public ElasticsearchItemReader(ElasticsearchTemplate elasticsearchTemplate, String indexName) {
this.elasticsearchTemplate = elasticsearchTemplate;
this.indexName = indexName;
}
@Override
public CustomerActivity read() {
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withIndices(indexName)
.build();
return elasticsearchTemplate.queryForObject(searchQuery, CustomerActivity.class);
}
}
Lets write a Processor to extract the WB field:
import org.springframework.batch.item.ItemProcessor;
import java.util.ArrayList;
import java.util.List;
public class CustomerActivityItemProcessor implements ItemProcessor<CustomerActivity, List<String>> {
@Override
public List<String> process(CustomerActivity item) {
// Extract the "wb" field from CustomerActivity and convert it to a List<String>
String wbValue = item.getWb();
// Perform any other processing if needed
List<String> resultList = new ArrayList<>();
resultList.add(wbValue);
return resultList;
}
}
Lets write a ItemWriter:
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.SearchQuery;
import java.util.List;
public class TextFileItemWriter implements ItemWriter<String> {
private final String filePath;
public TextFileItemWriter(String filePath) {
this.filePath = filePath;
}
@Override
public void write(List<? extends String> items) {
try {
// Specify the file path where you want to save the list of strings
String filePath = "file.txt";
// Open a BufferedWriter to write to the file
BufferedWriter writer = new BufferedWriter(new FileWriter(filePath));
// Iterate through the list and write each string to the file
for (String str : items) {
writer.write(str);
writer.newLine(); // Add a newline after each string
}
// Close the writer
writer.close();
return "Data written to file successfully!";
} catch (IOException e) {
e.printStackTrace();
return "Error writing to file: " + e.getMessage();
}
}
}
Final Step: Lets put the above pieces together in the configuration:
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
public BatchConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
}
@Bean
public ItemReader<CustomerActivity> reader(ElasticsearchRestTemplate elasticsearchRestTemplate) {
// Create and return the ElasticsearchItemReader
String indexName = "customer-activity-2023-11-09"
return new ElasticsearchItemReader(elasticsearchRestTemplate, indexName);
}
@Bean
public ItemWriter<String> writer(@Value("${output.file}") String filePath) {
// Create and return the TextFileItemWriter
return new TextFileItemWriter(filePath);
}
@Bean
public Step myStep(ItemReader<CustomerActivity> reader, ItemProcessor<CustomerActivity, List<String>> processor, ItemWriter<String> writer) {
return stepBuilderFactory.get("myStep")
.<CustomerActivity, String>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public Job myJob(Step myStep) {
return jobBuilderFactory.get("myJob")
.start(myStep)
.build();
}
}
Optionally, you can schedule the job to run a 1AM everyday:
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class ScheduledJobLauncher {
private final Job myJob;
@Autowired
public ScheduledJobLauncher(Job myJob) {
this.myJob = myJob;
}
@Scheduled(cron = "0 0 1 * * ?") // Run at 1:00 AM every day
public void runJob() {
try {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
myJobLauncher.run(myJob, jobParameters);
} catch (Exception e) {
// Handle exceptions
}
}
}
This marks the end of this tutorial. Now once you start the application, at everyday 1AM, it will read a data from a particular index and write to a file locally.
You can actually choose to upload to SFTP server, S3 or any other location.