Skip to Main Content

How to configure WSO2 Apache Kafka Connector with WSO2 Micro Integrator

Kafka Process Flow

In this tutorial, we will discuss how to configure the WSO2 Apache Kafka Connector with WSO2 Micro Integrator. To achieve this we will first discuss the components we require for the setup, the steps required to install and configure the environment along with subsequent options to create and deploy an application. Finally, we will test the entire flow.

For this purpose, the below index shall be followed:

  1. Components we require for the setup
  2. Architecture Overview 
    1. Our Process Flow
  3. Download and Configure the Apache Kafka, WSO2 Micro Integrator and WSO2 Integration Studio

    1. Download components
    2. Configure components
  4. Creating a project in WSO2 Integration Studio [Design-time]
    1. Create the proxy service in WSO2 Studio
    2. Exporting Integration Logic as a C-Application (Carbon Application)
    3. Add Kafka Connector into Project
    4. Build the Project Archive: [*.car]
  5. Start the Micro Integrator [Run-time] 
    1. Start the MI runtime along with CLI tool
  6. Deployment to Runtime 
    1. Deploy the application
    2. Confirm the deployed proxy services are running using the CLI tool
  7. Testing to confirm whether E2E Process Flow works

    1. Send Client Request using Curl Command
    2. Send Client request using internal HTTP Client within Studio
    3. Send Client request using SOAP UI
  8. Conclusions

1.Components we require for the setup

  • Apache Kafka: [Ver. kafka_2.12–1.0.0]

Apache Kafka is a distributed messaging system, providing fast, highly scalable messaging through a publisher-subscriber model. It is highly available, resilient to node failures and supports automatic recovery. Apache Kafka implements topics that can handle a high volume of data and has an enabler to pass on the messages from one endpoint to another. Apache Kafka is suitable for both offline and online message consumption. Apache Kafka is built on top of the Apache ZooKeeperTM synchronization service. All Kafka messages are organized into topics.

  • WSO2 Apache Kafka Connector: [Ver. 3.0.0]

The WSO2 Kafka connector allows you to access the Kafka Producer API through WSO2 EI. Hence, Kafka connector acts as a message producer which facilitates publishing messages from WSO2 EI to Kafka topics. Inbound Endpoint : Kafka inbound endpoint acts as a message consumer for Kafka. It receives messages from configured topics of the Kafka platform and injects them into the mediation flow.

  • WSO2 Integration Studio: [Ver. 7.2.0]

WSO2 Integration Studio is a drag-and-drop graphical development environment for WSO2 Enterprise Integrator. It provides efficient integration artefact development and accelerates development lifecycles.

  • WSO2 Micro Integrator: [Ver. wso2mi-1.2.0]

WSO2 Micro Integrator is an open-source, cloud-native integration framework with a graphical drag-and-drop integration flow designer and a configuration-based runtime for integrating APIs, services, data, and SaaS, proprietary, and legacy systems.

This is based on the same broadly adopted and battle-tested WSO2 EI/WSO2 ESB runtime used in previous versions. It has been optimized for container-native deployments based on Docker and Kubernetes.

Micro Integrator supports both centralized (ESB style) and decentralized (microservices, cloud native) architectural styles.

  • SOAP-UI: [Ver. 5.5.0]

SoapUI is the world’s leading functional testing tool for SOAP and REST testing. With its easy-to-use graphical interface, and enterprise-class features, SoapUI allows you to easily and rapidly create and execute automated functional, regression, and load tests. In a single test environment, SoapUI provides complete test coverage – from SOAP and REST-based Web services, to JMS enterprise messaging layers, databases, Rich Internet Applications, and much more. 

2. Architecture Overview

The following diagram describes the relationship between the Apache Kafka Server, the WSO2 Micro Integrator (with the Apache Kafka connector) and a SoapUI based client.

2.1 Our Process Flow:

In our scenario/exercise, we will use the SoapUI client to send a message request to the WSO2 Proxy Service, which internally publishes a message to a Kafka topic on the Apache Kafka Server.

3. Download and Configure the Apache Kafka Connector, WSO2 Micro Integrator and WSO2 Integration Studio

3.1 Download components

  • Download & extract WSO2 Micro Integrator here 
  • We will refer to this directory as <MI_HOME>
  • Download WSO2 Integration Studio here
  • Download and extract Apache Kafka here
  • We will refer to this directory as <KAFKA_HOME>

3.2 Configure components

The steps to follow in order to configure the components are as follows:

  1. Copy the following client libraries from <KAFKA_HOME>/lib to <MI_HOME>/lib.
  • kafka_2.12–1.0.0.jar,
  • kafka-clients-1.0.0.jar,
  • metrics-core-2.2.0.jar,
  • scala-library-2.12.3.jar,
  • zkclient-0.10.jar &
  • zookeeper-3.4.10.jar.
  1. Navigate to the <KAFKA_HOME>/bin & run the following command to start the ZooKeeper.

      ./zookeeper-server-start.sh config/zookeeper.properties

3. In another console navigate to the <KAFKA_HOME>/bin & run the following command to start the Kafka Broker Server.

./kafka-server-start.sh config/server.properties

4. Creating a project in WSO2 Integration Studio [Design-time]

4.1 Create the proxy service in WSO2 Studio

In order to create the proxy service in WSO2 Studio the following steps must be followed:

1. Create a ESB solution project and add WSO2 kafka connector into it.

File-> New -> Integration Project.

2. Provide Integration Project Name ‘MyKafkaConnectionProject’, & click Finish.

3. Right click on the project name (node: MyKafkaConnectionProjectConfig) & select ‘Add or Remove Connector/Module’ option as below:

4. Select ‘Add connector/module’ option ->

5. Search for the ‘Kafka’ connector & then click on ‘download’ icon to get into workspace.

 6. Confirm on pop-up appears on screen to download the connector into your Workspace.

  7. Acknowledge the 2nd pop-up by clicking on ‘OK’ & finally click on ‘Finish’ on ‘Add or Remove Connectors/Modules’ screen.

  8. To create a proxy service in the next step, we need a proxy configuration file (say ‘kafkaProxyService.xml’ available on local system)      with following details into it,

  • bootstrapServers,
  • topic,
  • maxPoolSize,
  • keySerializerClass  & valueSerializerClass
<?xml version="1.0" encoding="UTF-8"?>

<proxy name="kafkaProxyService" startOnLoad="true" transports="https http" xmlns="http://ws.apache.org/ns/synapse">

    <target>

        <inSequence>

            <kafkaTransport.init>

                <bootstrapServers>localhost:9092</bootstrapServers>

               <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>

                 <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>

                <maxPoolSize>100</maxPoolSize>

            </kafkaTransport.init>

            <kafkaTransport.publishMessages>

                <topic>MyKafkaTopic</topic>

            </kafkaTransport.publishMessages>

        </inSequence>

        <outSequence/>

        <faultSequence/>

    </target>

</proxy>

9. After adding the connector, right click on the project name -> New -> Proxy service 

10. Select ‘Import Proxy Service’ and click on Next.

 

11. Reference the ‘kafkaProxyService.xml’ from local file system which we have kept ready in step 7.

12. You can switch to design view and check the configuration.

4.2 Exporting Integration Logic as a C-Application (Carbon Application) 

   [Note: This will create a place holder for connector libs into the same project]

C-Application is the deployable artefact on the Enterprise Integrator runtime. 

We can export integration logic we developed into a C-App along with the connector. Now we can export the imported connector and the API into a single CAR application. 

The CAR application needs to be deployed during server runtime [C-Application is also know an ‘Composite Application’ in new releases].

In order to bundle Connector into a C-Application, a Connector Exporter Project is needed: 

Step 1: Navigate to File -> New -> Other -> WSO2 -> Extensions -> Project Types –> ‘Connector Exporter’ Project.

Step 2: Provide project name as ‘KafkaConnectorExportProject’ & click on Finish.

Step 3: Specify the parent from workspace and select the specific Integration Project you created from the dropdown.

4.3 Add Kafka Connector into Project 

 [Note: Adding Kafka connector libs into the place holder created in last step]

1. Now you need to add the Connector to Connector Exporter Project that you just created. Right click on the Connector Exporter Project and select, New -> Add Remove Connectors ->

2. Add Connector/module -> Next

3. Add from Workspace -> Select Connector

4. Click Workspace.

5. Select Connector & Click ‘OK’.

6. Click ‘Finish’.

7. We can see respective Kafka Connector library files are populated in the project.

4.4 Build the Project Archive: [*.car]

1. Right click on the project name called CompositeApplication and export ‘Carbon Application Archive’ into the wso2mi-                  1.2.0/micro-integrator/repository/deployment/server/carbonapps.

2. Provide a target directory, say “/tmp/*.car”,

3. ‘Select All’ & Click ‘Finish’.

5. Start the Micro Integrator [Run-time]

5.1  Start the MI runtime along with CLI tool  

  1. Navigate to the <MI_HOME>/bin and run the following command.
  2. On MacOS/Linux/CentOS: “sh micro-integrator.sh” & On Windows: “micro-integrator.bat”
  3. But we need to enable the management API while we start WSO2 Micro Integrator instance. So, run “sh micro-integrator.sh –DenableManagementApi”

6. Deployment to Runtime

6.1 Deploy the application

1. Deploy the application by dropping the *.car file in MI deployment folder as shown.

2. MI runtime Console will display the deployment of application is successful.

6.2 Confirm the deployed proxy services are running using the CLI tool

1. Make sure you first export the PATH as below,

$export PATH=/<MI-CLI-HOME>/bin:$PATH

$ export PATH=/opt/WSO2/wso2mi-cli-1.2.0/bin/bin:$PATH

2. Login to the CLI tool,  [Default Credentials: admin/admin]

“./mi remote login” 

3. Check the proxy services.

“./mi proxyservice show”

4. In this scenario, it shows following.

http://SWAPNILs-MacBook-Pro.local:8290/services/KafkaTransport?wsdl

5. [Internally, it tries to connect to https://localhost:9164/management/login]

6. Check the API services.

“./mi api show”

In this scenario, it shows following,http://localhost:8290/publishMessages

7. Now create a topic named “test” with a single partition and only one replica.

Navigate to the <KAFKA_HOME>/bin and run following command,

./kafka-topics.sh –create –bootstrap-server localhost:9092 –replication-factor 1 –partitions 1 –topic test

7. Testing to confirm whether E2E Process Flow works

In this, we will send a message to a Kafka broker via Kafka topics using clients (WSO2 Studio and SOAP-UI) to confirm Message is getting published to Kafka Server.

7.1 Send Client Request using Curl Command

1. Use the CURL command as if you are sending request through client.

curl -X POST -d ‘{“name”:”Hello Kafka World!!”}’

“http://localhost:8290/publishMessages” -H “Content-Type:application/json” -v

2. Then will see the published message on the Kafka server by running the following command,

“bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning”

3. You will get a response as below which confirms E2E message flow is successful,

 {“name”:” Hello Kafka World!!”}

7.2 Send Client request using internal HTTP Client within Studio

1. In Studio, click on ‘Server Configuration’ , to test against embedded MI Server,

Populate the Kafka libraries as shown + Kafka Inbound Endpoint ‘org.apache.synapse.kafka.poll-1.0.10.jar’ from connector store by searching for Kafka.

2. Configure Carbon-Home in ‘Debug Configuration’ screen as shown below,

         In my case, Working Directory ? /opt/WSO2/wso2mi-1.2.0/micro-integrator

 3. Open ‘HTTP Client’ tab available at Console tab, and populate the details and 

        Execute, will receive HTTP 200 response to confirm Service call is successful.

4. Receiver step is covered in next option.

7.3 Send Client request using SOAP UI

1. Check the proxy services,

         “./mi proxyservice show”

2. In this scenario, it shows the following.

http://swapnils-macbook-pro.local:8290/services/WeatherDataPublishService.W……?wsdl

3. Create SOAP Project in SOAP-UI & Configure above WSDL,

4. Populate the payload/body as shown & execute, (You will receive HTTP 200 as success response from service, as shown)

5. And at Topic Consumer screen (using topic: weatherdatatopic), will find the respective message,

       “./kafka-console-consumer.sh –topic weatherdatatopic –bootstrap-server=localhost:9092

8. Conclusions

In the beginning, we did setup Apache Kafka & Kafka Connector with WSO2Then we developed a process flow in WSO2 to publish Message to Kafka Topic. We have made consumer client listening on given Topic at Apache Kafka. We used Client (Curl/HTTP Client in Studio/SOAP-UI) to call WSO2 process flow. Finally, we have confirmed the same message arrival at Kafka consumer who is listening on the same Topic.