Spring Batch with ElasticSearch

Spring Boot 2.X and ElasticSearch 7.X

·

4 min read

In this article, we will explore, setting up a simple Spring Batch code that reads data from Elasticsearch, extracts a field and creates a text file out of it. This is often a common use-case, I have seen.

If you do not have a Spring project, you can go ahead and download from https://start.spring.io/.

Lets add Spring Batch related dependencies:


    <!-- Spring Batch dependencies -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>

Lets add Elasticsearch related dependencies:

    <!-- Elasticsearch dependencies -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
    </dependency>

Update application.properties to have the properties to connect to elastic-search

spring.data.elasticsearch.cluster-name=myCluster
spring.data.elasticsearch.cluster-nodes=localhost:9200

Lets create a Java class that resembles a record in ES:

import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;

@Document
public class CustomerActivity {
    @Id
    private String id;
    private String wb;

    // Getters and setters
}

Note: Do not put indexName here, since this example expects IndexName to be dynamically picked as part of the batch.

Lets create class to read from Elastic-search:

import org.springframework.batch.item.ItemReader;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.SearchQuery;

public class ElasticsearchItemReader implements ItemReader<CustomerActivity> {
    private final ElasticsearchTemplate elasticsearchTemplate;
    private final String indexName;

    public ElasticsearchItemReader(ElasticsearchTemplate elasticsearchTemplate, String indexName) {
        this.elasticsearchTemplate = elasticsearchTemplate;
        this.indexName = indexName;
    }

    @Override
    public CustomerActivity read() {
        SearchQuery searchQuery = new NativeSearchQueryBuilder()
                .withIndices(indexName)
                .build();

        return elasticsearchTemplate.queryForObject(searchQuery, CustomerActivity.class);
    }
}

Lets write a Processor to extract the WB field:

import org.springframework.batch.item.ItemProcessor;

import java.util.ArrayList;
import java.util.List;

public class CustomerActivityItemProcessor implements ItemProcessor<CustomerActivity, List<String>> {

    @Override
    public List<String> process(CustomerActivity item) {
        // Extract the "wb" field from CustomerActivity and convert it to a List<String>
        String wbValue = item.getWb();

        // Perform any other processing if needed

        List<String> resultList = new ArrayList<>();
        resultList.add(wbValue);
        return resultList;
    }
}

Lets write a ItemWriter:

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.SearchQuery;

import java.util.List;

public class TextFileItemWriter implements ItemWriter<String> {
    private final String filePath;

    public TextFileItemWriter(String filePath) {
        this.filePath = filePath;
    }

    @Override
    public void write(List<? extends String> items) {
         try {
            // Specify the file path where you want to save the list of strings
            String filePath = "file.txt";

            // Open a BufferedWriter to write to the file
            BufferedWriter writer = new BufferedWriter(new FileWriter(filePath));

            // Iterate through the list and write each string to the file
            for (String str : items) {
                writer.write(str);
                writer.newLine(); // Add a newline after each string
            }

            // Close the writer
            writer.close();

            return "Data written to file successfully!";
        } catch (IOException e) {
            e.printStackTrace();
            return "Error writing to file: " + e.getMessage();
        }
    }
}

Final Step: Lets put the above pieces together in the configuration:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    public BatchConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
    }

    @Bean
    public ItemReader<CustomerActivity> reader(ElasticsearchRestTemplate elasticsearchRestTemplate) {
        // Create and return the ElasticsearchItemReader
        String indexName = "customer-activity-2023-11-09"
        return new ElasticsearchItemReader(elasticsearchRestTemplate, indexName);
    }

    @Bean
    public ItemWriter<String> writer(@Value("${output.file}") String filePath) {
        // Create and return the TextFileItemWriter
        return new TextFileItemWriter(filePath);
    }

    @Bean
    public Step myStep(ItemReader<CustomerActivity> reader, ItemProcessor<CustomerActivity, List<String>> processor, ItemWriter<String> writer) {
        return stepBuilderFactory.get("myStep")
                .<CustomerActivity, String>chunk(10)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }

    @Bean
    public Job myJob(Step myStep) {
        return jobBuilderFactory.get("myJob")
                .start(myStep)
                .build();
    }
}

Optionally, you can schedule the job to run a 1AM everyday:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class ScheduledJobLauncher {

    private final Job myJob;  

    @Autowired
    public ScheduledJobLauncher(Job myJob) {
        this.myJob = myJob;
    }

    @Scheduled(cron = "0 0 1 * * ?")  // Run at 1:00 AM every day
    public void runJob() {
        try {
            JobParameters jobParameters = new JobParametersBuilder()
                    .addLong("time", System.currentTimeMillis())
                    .toJobParameters();



            myJobLauncher.run(myJob, jobParameters);
        } catch (Exception e) {
            // Handle exceptions
        }
    }
}

This marks the end of this tutorial. Now once you start the application, at everyday 1AM, it will read a data from a particular index and write to a file locally.

You can actually choose to upload to SFTP server, S3 or any other location.