Skip to main content

Spring Cloud Stream Using Kafka




Recently I've been digging in event-driven architectures for service communication. For demo followed broker approach instead of Feed approach, as broker approach has dedicated broker responsible for distributing the events to n number of subscribers.

For demo we will use most popular open-source Kafka broker (https://kafka.apache.org).

As spring (http://spring.io)  always makes life easy 😎, Spring-Cloud-Stream project by spring makes Kafka integration in your application very easy 👍 with few annotations.

Spring Cloud Stream provides Binder implementations for KafkaRabbit MQRedis, and Gemfire.

Note: Before we proceed make sure you have installed ZooKeeper and Kafka & both are running.



Dependencies Add following dependencies to your POM for using KAFKA.




Use Case: We will have One producer (demo-producer application) & two consumers (demo-consumer application & demo-consume1 application).

Producer raises event on Message Broker on Topic "demochannel",  Both Consumers has subscribed on Message Broker on Same Topic "demochannel" to consume event. This means as soon as event is raised, broker will notify respective subscriber to take action.



Producing Event: Create Spring Boot Project(demo-producer) with above dependencies. Create Main Application class : DemoProduceApplication.java as below


Create SourceChannel Interface

The @Output annotation identifies an output channel, through which published messages leave the application.

Create MessageProducer which will put message every second.


Add the @EnableBinding annotation to your application to get immediate connectivity to a message broker. 

Apart from raising event every second, we will create event using REST. So define rest-controller


application.properties looks like this.




Consuming Events: Create Spring-Boot Project(demo-consumer) with above dependencies Create Main Application class: DemoConsumerApplication.java

Create Sink Interface


The @Input annotation identifies an input channel, through which received messages enter the application. application.properties



Follow exactly same step as demo-consumer (defined above) for creating demo-consumer2 project, Once all required files are created change application.properties as below ( need to change only server port)






Steps to Exceute

Start zookeeper which is installed - sh zkServer.sh start. 

Start Kafka Server: sh bin/kafka-server-start.sh config/server.properties

Start: Producer Application (demo-producer) 

Start Both Application (demo-consumer & demo-consumer1)

Check - Console for both consumer


Hit REST endpoint using CURL: Here you see once request is served it raise events and which is consume by two subscribers.

Cheers – Happy learning 🙂

Ketan Gote

Comments

Popular posts from this blog

Centralized configuration using Spring Cloud Config

In this blog we will be focusing on centralized configuration using  Spring Cloud Config  project. For single standalone application we can keep all required configuration bundle with application itself.  However, when we have more than one application, say in a microservice architecture, a better alternative would be to manage the configurations centrally. With the Config Server we have a central place to manage external properties for applications with support for different environments. Configuration files in several formats like YAML or properties are added to a Git repository. Features Spring Cloud Config Server features: HTTP, resource-based API for external configuration (name-value pairs, or equivalent YAML content) Encrypt and decrypt property values (symmetric or asymmetric) Embeddable easily in a Spring Boot application using  @EnableConfigServer Config Client features (for Spring applications): Bind to the Config Server and initialize...

Function Point Analysis : ISO/IEC 20926:2009

This blog focuses on explaining the Function Point calculations. Software Sizing Background Function Point Rules for Counting FP Deep Dive - Function Point Analysis Case Study General Software Characteristics Details History - Measurement Methodologies Lines of Code (Oldest) Use case based Software Sizing IPFUG Function Point Analysis (ISO) Need for Software Sizing. Estimation and Budgeting Phasing Development Work Prioritization of Work Monitoring the Progress Bidding for Projects Allocating Testing Resources To measure and Manage Productivity Risk Assessment Software Asset Valuation CMMi Level 2 and 3 require that a valid sizing method be used. Software Sizing - Lines of Code The easiest and historically the most common method in Sizing Software project has been counting the number of lines of code and / or the number of screens. Advantages Automation of the counting process can be done Intuitive as the measurements are easily u...

CRUD in MongoDB & Spring Data

Now, since we have already been introduced to mongo and its server is setup. We shall now move on to performing basic CRUD operations. Lets take a use case example of ‘Company’ database that stores employee information.  We wish to store employee name, email address, age and multiple addresses. Traditionally in RDBMS we would create an Employee table and Address Table having foreign key reference to ‘employee id ‘ Incase of NoSQL, we will be creating Employee documnet which will have store employee information like name, email, age and an array of employes address. Following is a snippet of the schema defined { "name" : "", "email" : "", "age" : “”, "address" : [ ... ] } Thus to begin with, we will first define the entities. We have employee as an aggregate root entity that stores list of address having 1-M relatioship. Address Entity is represend as @Embeddable as it is embaded in another aggregate root entity. Employee is...