Skip to main content

Command Palette

Search for a command to run...

Spring Batch with ElasticSearch

Spring Boot 2.X and ElasticSearch 7.X

Published
4 min read

In this article, we will explore, setting up a simple Spring Batch code that reads data from Elasticsearch, extracts a field and creates a text file out of it. This is often a common use-case, I have seen.

If you do not have a Spring project, you can go ahead and download from https://start.spring.io/.

Lets add Spring Batch related dependencies:


    <!-- Spring Batch dependencies -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>

Lets add Elasticsearch related dependencies:

    <!-- Elasticsearch dependencies -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
    </dependency>

Update application.properties to have the properties to connect to elastic-search

spring.data.elasticsearch.cluster-name=myCluster
spring.data.elasticsearch.cluster-nodes=localhost:9200

Lets create a Java class that resembles a record in ES:

import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;

@Document
public class CustomerActivity {
    @Id
    private String id;
    private String wb;

    // Getters and setters
}

Note: Do not put indexName here, since this example expects IndexName to be dynamically picked as part of the batch.

Lets create class to read from Elastic-search:

import org.springframework.batch.item.ItemReader;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.SearchQuery;

public class ElasticsearchItemReader implements ItemReader<CustomerActivity> {
    private final ElasticsearchTemplate elasticsearchTemplate;
    private final String indexName;

    public ElasticsearchItemReader(ElasticsearchTemplate elasticsearchTemplate, String indexName) {
        this.elasticsearchTemplate = elasticsearchTemplate;
        this.indexName = indexName;
    }

    @Override
    public CustomerActivity read() {
        SearchQuery searchQuery = new NativeSearchQueryBuilder()
                .withIndices(indexName)
                .build();

        return elasticsearchTemplate.queryForObject(searchQuery, CustomerActivity.class);
    }
}

Lets write a Processor to extract the WB field:

import org.springframework.batch.item.ItemProcessor;

import java.util.ArrayList;
import java.util.List;

public class CustomerActivityItemProcessor implements ItemProcessor<CustomerActivity, List<String>> {

    @Override
    public List<String> process(CustomerActivity item) {
        // Extract the "wb" field from CustomerActivity and convert it to a List<String>
        String wbValue = item.getWb();

        // Perform any other processing if needed

        List<String> resultList = new ArrayList<>();
        resultList.add(wbValue);
        return resultList;
    }
}

Lets write a ItemWriter:

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.SearchQuery;

import java.util.List;

public class TextFileItemWriter implements ItemWriter<String> {
    private final String filePath;

    public TextFileItemWriter(String filePath) {
        this.filePath = filePath;
    }

    @Override
    public void write(List<? extends String> items) {
         try {
            // Specify the file path where you want to save the list of strings
            String filePath = "file.txt";

            // Open a BufferedWriter to write to the file
            BufferedWriter writer = new BufferedWriter(new FileWriter(filePath));

            // Iterate through the list and write each string to the file
            for (String str : items) {
                writer.write(str);
                writer.newLine(); // Add a newline after each string
            }

            // Close the writer
            writer.close();

            return "Data written to file successfully!";
        } catch (IOException e) {
            e.printStackTrace();
            return "Error writing to file: " + e.getMessage();
        }
    }
}

Final Step: Lets put the above pieces together in the configuration:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    public BatchConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
    }

    @Bean
    public ItemReader<CustomerActivity> reader(ElasticsearchRestTemplate elasticsearchRestTemplate) {
        // Create and return the ElasticsearchItemReader
        String indexName = "customer-activity-2023-11-09"
        return new ElasticsearchItemReader(elasticsearchRestTemplate, indexName);
    }

    @Bean
    public ItemWriter<String> writer(@Value("${output.file}") String filePath) {
        // Create and return the TextFileItemWriter
        return new TextFileItemWriter(filePath);
    }

    @Bean
    public Step myStep(ItemReader<CustomerActivity> reader, ItemProcessor<CustomerActivity, List<String>> processor, ItemWriter<String> writer) {
        return stepBuilderFactory.get("myStep")
                .<CustomerActivity, String>chunk(10)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }

    @Bean
    public Job myJob(Step myStep) {
        return jobBuilderFactory.get("myJob")
                .start(myStep)
                .build();
    }
}

Optionally, you can schedule the job to run a 1AM everyday:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class ScheduledJobLauncher {

    private final Job myJob;  

    @Autowired
    public ScheduledJobLauncher(Job myJob) {
        this.myJob = myJob;
    }

    @Scheduled(cron = "0 0 1 * * ?")  // Run at 1:00 AM every day
    public void runJob() {
        try {
            JobParameters jobParameters = new JobParametersBuilder()
                    .addLong("time", System.currentTimeMillis())
                    .toJobParameters();



            myJobLauncher.run(myJob, jobParameters);
        } catch (Exception e) {
            // Handle exceptions
        }
    }
}

This marks the end of this tutorial. Now once you start the application, at everyday 1AM, it will read a data from a particular index and write to a file locally.

You can actually choose to upload to SFTP server, S3 or any other location.

A

I just read your blog on Spring Batch with Elasticsearch—thanks for sharing! It provided a clear and concise guide on integrating Spring Batch with Elasticsearch for data extraction and processing.

While exploring more, I found this resource on Using OpenSearch Java Client and Spring Boot for Search Integration: https://mobisoftinfotech.com/resources/blog/opensearch-java-client-spring-boot-search-integration . It covers OpenSearch Java SDK, Spring Data OpenSearch, and best practices for performance tuning.

What are some key considerations for optimizing Spring Batch jobs when processing large Elasticsearch datasets? Also, how does Spring Batch compare with Apache NiFi for ETL workloads involving Elasticsearch? Looking forward to your insights. Great read!

More from this blog

Smit Shah's Blog

30 posts