Amis Blog

Subscribe to Amis Blog feed
Friends of Oracle and Java
Updated: 2 hours 14 min ago

Kafka Streams and NodeJS – Consuming and periodically reporting in Node.JS on the results from a Kafka Streams streaming analytics application

Mon, 2017-02-13 09:22

In several previous articles on Apache Kafka, Kafka Streams and Node.JS for interacting with Apache Kafka, I have described how to create a Node.JS application that publishes messages to a Kafka Topic (based on entries in a CSV file), how to create a simple Kafka Streams Java application that processes such messages from that Topic and how to extend that Java application to produce a running Top-N aggregation from that Topic. In this article, I want to discuss a Node application that consumes the Top-N reports from the Kafka Topic produced to by the Kafka Streams application and periodically (once every X seconds) reports on the current standings.

image

The sources for this article are in this GitHub Repo: https://github.com/lucasjellema/kafka-streams-running-topN.

The Node application uses the npm module kafka-node (https://www.npmjs.com/package/kafka-node) for the interaction with Kafka.

A new Client is created – based on the ZooKeeper connect string (ubuntu:2181/). Using the Client, a Consumer is constructed. The consumer is configured to consume from Topic Top3CountrySizePerContinent. A message handler is associated with the consumer, to handle messages on the topic.

The messages consumed by the Node consumer have the following structure:

{"topic":"Top3CountrySizePerContinent"
,"value":"{\"nrs\":[{\"code\":\"DZ\",\"name\":\"Algeria\",\"population\":40263711,\"size\":2381741,\"continent\":\"Africa\"},{\"code\":\"CD\",\"name\":\"Democratic Republic of the Congo\",\"population\":81331050,\"size\":2344858,\"continent\":\"Africa\"},{\"code\":\"SD\",\"name\":\"Sudan\",\"population\":36729501,\"size\":1861484,\"continent\":\"Africa\"},null]}"
,"offset":244
,"partition":0
,"key":{"type":"Buffer","data":[65,102,114,105,99,97]}
}

The key of the message is of type buffer. We happen to know the key is actually a String (the name of the continent). We can extract the key like this:

var continent = new Buffer(countryMessage.key).toString(‘ascii’);

The payload of the message – the top3 for the continent – is in the value property. It can be extracted easily:

var top3 = JSON.parse(countryMessage.value);

{"nrs":
  [
   {"code":"BS","name":"Bahamas","population":327316,"size":13880,"continent":"North America"}
  ,{"code":"AG","name":"Antigua and Barbuda","population":93581,"size":443,"continent":"North America"}
  ,{"code":"AW","name":"Aruba","population":113648,"size":180,"continent":"North America"}
  ,null
  ]
}

The object countrySizeStandings contains a property for each continent. The property is set equal to the top3 that was most recently consumed from the Kafka Topic Top3CountrySizePerContinent.

countrySizeStandings[continent]=top3;

Using the Node built in setInterval() the report() function is scheduled for execution every reportingIntervalInSecs seconds. This function writes the current data in countrySizeStandings to the console.

 

/*

This program consumes Kafka messages from topic Top3CountrySizePerContinent to which the Running Top3 (size of countries by continent) is produced.

This program reports: top 3 largest countries per continent (periodically, with a configurable interval) 
*/


var kafka = require('kafka-node')
var Consumer = kafka.Consumer
var client = new kafka.Client("ubuntu:2181/")

var countriesTopic = "Top3CountrySizePerContinent";
var reportingIntervalInSecs = 4;

var consumer = new Consumer(
  client,
  [],
  {fromOffset: true}
);

consumer.on('message', function (message) {
  handleCountryMessage(message);
});

consumer.addTopics([
  { topic: countriesTopic, partition: 0, offset: 0}
], () => console.log("topic "+countriesTopic+" added to consumer for listening"));

var countrySizeStandings = {}; // the global container for the most recent country size standings 

function handleCountryMessage(countryMessage) {
    var top3 = JSON.parse(countryMessage.value);
    // extract key value from the Kafka message
    var continent = new Buffer(countryMessage.key).toString('ascii');
    // record the top3 for the continent indicated by the message key as current standing in the countrySizeStandings object
    countrySizeStandings[continent]=top3;
}// handleCountryMessage

// every reportingIntervalInSecs seconds, report on the current standings per continent
function report() {
   var d = new Date();
   console.log("Report at "+ d.getHours()+":"+d.getMinutes()+ ":"+d.getSeconds());
   // loop over the keys (properties) in the countrySizeStandings map (object)
   for (var continent in countrySizeStandings) {
     if (countrySizeStandings.hasOwnProperty(continent)) {
        var line = continent+ ": ";
        var index = 1;
        countrySizeStandings[continent].nrs.forEach(function(c) {
          if (c) {
            line = line + (index++) +'. '+ c.name+ '('+c.size+'), ';
          }
        });
        console.log(line);
    }//if
  }//for
}//report

// schedule execution of function report at the indicated interval
setInterval(report, reportingIntervalInSecs*1000);

 

Running the end to end chain requires a running Kafka Cluster and the running of the Node application to produce the country messages from the CSV file, the Kafka Streams Java application to derive the running Top 3 standings and finally the Node application introduced in this article to consume the Top 3 standings and report them to the console (as instructed in the ReadMe in the GitHub Repo):

  • node KafkaCountryProducer.js
  • java -cp target/Kafka-Streams-Country-TopN-1.0-SNAPSHOT.jar;target/dependency/* nl.amis.streams.countries.App
  • node KafkaCountryStreamsConsumer.js

The CountryProducer.js Node application writes the messages it produced to Kafka to the console as well:

SNAGHTML1c22841

The Kafka-Streams-Country-TopN Java application also writes its streaming analytic findings to the console:

SNAGHTML1c325c1

The outcome of the Kafka Streams analysis – as published to the Kafka Topic – is consumed by the Node application, continuously, and reported to the console, periodically (once every 30 seconds), updated with the latest findings:

image

The post Kafka Streams and NodeJS – Consuming and periodically reporting in Node.JS on the results from a Kafka Streams streaming analytics application appeared first on AMIS Oracle and Java Blog.

Connecting Oracle Management Cloud with Oracle Enterprise Manager 13c

Sun, 2017-02-12 10:00

Let’s clear about this: Oracle Management Cloud (OMC) is NOT a replacement of Oracle Enterprise Manager Cloud Control (OEM CC) or even an equivalant. Rumours are that this will  be Oracle’s policy in a far away future, but in the meantime we focus on what they do best. OEM CC is a product for a complete management solution for your Oracle environment, OMC for monitoring and, most of all,  analyse the monitored data in your Oracle environment.

Oracle made it possible to connect these worlds by using the data of the repository of OEM CC in OMC. And that’s what this post is about.

In a previous blog about monitoring Infrastructure with OMC I installed an OMC-cloud agent on a server with OEM CC with the repository database on it.

Through this OMC-cloud agent it’s possible to monitor the assets – in a nice gui – but what I’d really like to do is use the data in the OEM CC – repository for the analytical power of Oracle Management Cloud.

My infrastructure monitoring is working since this blog by installing an OMC-cloud agent. The question is however, do I have to install an OMC-cloud agent on every node, and connect every node  to the OMC?  A part of that is true. A cloud agent is necessary on every node, but they all can be directed to 1 node where a central gateway has been installed for connection to OMC. But of course you also can install a data collector for information from the Oracle Enterprise Manager Repository.

In the documentation of IT-analytics there’s a picture with quite a nice overview:

image

Another, maybe more explanatory image:

 

image

 

For now I’m interested in the data collector. This data collector collects different types of data from the Oracle Management Repository, including targets, target properties, metrics, performance metrics, and events.

According to the doc I need to install the gateway ánd the datacollector. In the former post I already downloaded the master-installer for Infrastructure monitoring, and this is the same as for the gateway and data collector.

The gateway

For the gateway I chose to use port 1840

Performing the installation in two steps: downloading first, and then install from staged directory.

./AgentInstall.sh AGENT_TYPE=gateway AGENT_REGISTRATION_KEY=’RMxMm7chywi-J-VZ7_UfxY5XUU’  STAGE_LOCATION=/gwayagent -download_only  —-> this may take a few minutes, results in a gateway.zip of approx 290 MB.

./AgentInstall.sh AGENT_TYPE=gateway AGENT_REGISTRATION_KEY=’RMxMm7chywi-J-VZ7_UfxY5XUU’ AGENT_BASE_DIR=/omc_gway AGENT_PORT=1840 -staged

Generating emaas.properties …
Extracting Agent Software …
Installing the Agent …
Registering the Agent …
Downloading Certificates …
Configuring the Agent …
Cleanup temporary files …
The following configuration scripts need to be executed as the root user
#!/bin/sh
#Root script to run
/omc_gway/core/1.12.0/root.sh

That was quite easy.

The data collector.

Same recipe, first download, then the install.

./AgentInstall.sh AGENT_TYPE=data_collector AGENT_REGISTRATION_KEY=’RMxMm7chywi-J-VZ7_UfxY5XUU'  STAGE_LOCATION=/gwayagent -download_only  ---> this results in a lama.zip, the same as the cloud_agent,so in fact no need to download again!

 

./AgentInstall.sh AGENT_TYPE=data_collector AGENT_REGISTRATION_KEY=’RMxMm7chywi-J-VZ7_UfxY5XUU’ AGENT_BASE_DIR=/omc_dc GATEWAY_HOST=ovamisux159.amis.local GATEWAY_PORT=1840 EM_AGENT_NAME=ovamisux159.amis.local:1830 HARVESTER_USERNAME=SYSMAN_EMAAS_3 OMR_USERNAME=sys OMR_HOSTNAME=ovamisux159.amis.local OMR_HOST_USER_PASSWORD=welcome1 OMR_PORT=1521 OMR_SERVICE_NAME=cloudcontrol OMR_HOST_USERNAME=oracle OMR_STAGE_DIR=/gwayagent -staged

Enter HARVESTER_USER_PASSWORD:
Enter OMR_USER_PASSWORD:

Generating emaas.properties …
Extracting Agent Software …
Installing the Agent …
Registering the Agent …
Downloading Certificates …
Configuring the Agent …
Cleanup temporary files …
The following configuration scripts need to be executed as the root user
#!/bin/sh
#Root script to run
/omc_dc/core/1.12.0/root.sh

Here I was a bit confused by the use of OMR_USERNAME. The documentation  of the agent_install script states that this is “The Oracle Management Repository user name “.  But this user should be a user with SYSDBA to install the Harvester schema – in this case SYSMAN_EMAAS_3.

O.k. done.

A recap: what is running on my server at this moment:

OEM CC – the Oracle Enterprise Manager – Management Server

OEM CC – the Oracle Enterprise Manager repository database 12.1

OEM CC – the Oracle Enterprise Manager agent

OMC – cloud agent

OMC – gateway (agent)

OMC – data collector (agent)

And the data collector is showing him/herself in the Oracle Management Cloud:

image

 

Then it’s time to add entities to the IT analytics-datawarehouse according to the doc. This time no need for OMCLI-commands, now’s it’s time to return to the GUI of the OMC.

Here it’s called ‘Enabling Services’ on Entities.

 

Be aware: only the items which were added through the infrastructure monitoring (through OMCLI) can be enabled as a service!

A small documentation bug: I seem to miss a functionality: the IT Analytics Administration link.

According to the documentation:

As an IT Analytics administrator, you can add database targets to the IT Analytics warehouse for AWR collection.
To add targets, do the following:
From the Oracle Management Cloud home page, click the Application Navigator icon in the upper-right corner of the page.
From the Administration group, click the IT Analytics Administration link

Found out this link is no longer called “IT Analytics Administration”, it is just “Administration”.
From there, the link “Enable/Disable Services” is used to add entities/targets to the ITA Warehouse.

So… this one:

image

 

 

 

image

 

 

 

image

 

image

 

image

 

And there it is:

 

image

 

But, no data is yet to be seen. Is the data collector healthy and well-configured?

[oracle@ovamisux159 bin]$ ./omcli status agent
Oracle Management Cloud Data Collector
Copyright (c) 1996, 2016 Oracle Corporation.  All rights reserved.
—————————————————————
Version                : 1.12.0
State Home             : /omc_dc/agent_inst
Log Directory          : /omc_dc/agent_inst/sysman/log
Binaries Location      : /omc_dc/core/1.12.0
Process ID             : 27293
Parent Process ID      : 27235
URL                    :
https://ovamisux159.amis.local:1830/emd/main/
Started at             : 2016-12-30 12:26:01
Started by user        : oracle
Operating System       : Linux version 3.8.13-55.1.6.el7uek.x86_64 (amd64)
Data Collector enabled : true
Sender Status          : FUNCTIONAL
Gateway Upload Status  : FUNCTIONAL
Last successful upload : 2017-01-22 17:03:01
Last attempted upload  : 2017-01-22 17:03:01
Pending Files (MB)     : 0.01
Pending Files          : 16
Backoff Expiration     : (none)

—————————————————————
Agent is Running and Ready

Are  there any connectivity issues?

[oracle@ovamisux159 bin]$ ./omcli status agent connectivity -verbose
Oracle Management Cloud Data Collector
Copyright (c) 1996, 2016 Oracle Corporation.  All rights reserved.
—————————————————————
Data Collector is Running

No significant connectivity issues found between Data Collector and Gateway.
Check the connectivity status of the Gateway: ovamisux159.amis.local

Ahh.. forgot one thing:

Following the note “Enabling Host Monitoring for the Infrastructure Monitoring Cloud Service (Doc ID 2195074.1)” changed the file /omc_dc/plugins/oracle.em.sgfm.zip/1.12.0/configs/discovery.properties. Deleted the # sign at the line with ‘omc_host_linux’. :

navigate to directory:

cd /u01/app/oracle/omc_dc/plugins/oracle.em.sgfm.zip/1.12.0/configs

open a file for editing in vi

vi discovery.properties

comment out the parameter: disable_monitoring_for_entitytype=omc_host_linux

save the modified file.

Activate the changes by stopping and starting the agent:

./omcli stop agent

./omcli start agent

 

Checking the log file /omc_dc/agent_inst/sysman/log/gcagent.log : no errors.

 

Are there any items that are added to OMC? When navigating to e.g. Enterprise Health –> Entities, I can see entities which I haven’t added manually to EMC, and which are obviously added by the OMC – data explorer.

You may ask why the status is unknown? That’s because of the OEM CC agent which died at my server while having some other problems.

 

image

 

Nice to know: how does this data collector collects his data of the repository? This is recorded in the log-file of the OMC-data collector log, hereby an example.

SELECT a.target_guid, a.snapshot_guid, a.snapshot_type, a.target_type, a.start_timestamp, c.timezone_region, b.ecm_snapshot_id, b.FILE_SYSTEM “_E_FILE_SYSTEM_”

FROM mgmt_ecm_gen_snapshot a

INNER JOIN mgmt_targets c ON (a.target_guid = c.target_guid)

INNER JOIN emaas_whitelist_entities w

ON (w.harvester_name = ? AND c.target_guid = w.member_entity_guid)

INNER JOIN em_ecm_all_config_snapshots e

ON (a.snapshot_guid = e.ecm_snapshot_id AND a.start_timestamp = e.collection_timestamp   AND (  (w.is_new = 0 AND e.saved_timestamp >= ? AND e.saved_timestamp < ?)OR   (w.is_new = 1 AND e.saved_timestamp < ?)     )      )

INNER JOIN em_ecm_all_config_metrics d

ON (d.ecm_snapshot_id = a.snapshot_guid AND d.table_name = ? AND d.collection_timestamp = e.collection_timestamp)

LEFT OUTER JOIN EM_ESM_FILESYS b ON (a.snapshot_guid = b.ecm_snapshot_id)

WHERE a.target_type = ?

AND a.snapshot_type = ?

AND a.is_current = ?

AND a.target_guid IS NOT NULL

ORDER BY a.target_guid

 

O.k. I clearly reached my goal. OMC is connected with OEM CC. And that’s where it stops at the moment for this blogpost.

But now what? I would have love to show some more, but time is limited (and the trial period) . Clicking around in OMC is not very intuitive and you clearly need some time to figure out how to use this and discover the added value. You also need time to figure out what all the agents on your system are doing, except consuming resources.

A recap: the concept of OMC is still very promising, however the GUI and the technical implementation needs improvement. But hey, the developers of OMC has just begun, compared with the maturity of OEM CC 13c.

My major concern at the moment is the policy of Oracle. Their cloud-eagerness could led to a decision to replace OEM CC for OMC, while these products could be complementary to eachother, even reinforcing.

Regardz.

 

Resources:

– Deploying Oracle Management Cloud for Infrastructure Monitoring: https://technology.amis.nl/2016/12/18/deploying-oracle-management-cloud-for-infrastructure-monitoring/

– Documentation IT Analytics: http://docs.oracle.com/en/cloud/paas/management-cloud/emcad/setting-it-analytics.html

– Oracle Management Cloud Master Note: (Doc ID 2092091.1)

– IT Analytics Cloud Service Master Note (Doc ID 2107732.1)

-Configuration cloud agent for log analytics on jcs instance : https://technology.amis.nl/2017/01/20/configuring-cloud-agent-for-log-analytics-on-jcs-instance-to-collect-weblogic-log-files/

– Agent_install script: https://docs.oracle.com/en/cloud/paas/management-cloud/emcad/running-agentinstall-script.html

The post Connecting Oracle Management Cloud with Oracle Enterprise Manager 13c appeared first on AMIS Oracle and Java Blog.

Apache Kafka Streams – Running Top-N Aggregation grouped by Dimension – from and to Kafka Topic

Sat, 2017-02-11 22:02

This article explains how to implement a streaming analytics application using Kafka Streams that performs a running Top N analysis on a Kafka Topic and produces the results to another Kafka Topic. Visualized, this looks like this:

image

Two previous articles are relevant as reference:

This GitHub Repo contains the sources for this article: https://github.com/lucasjellema/kafka-streams-running-topN.

Note that almost all aggregations are specializations of this top-N: min, max, sum, avg and count are all simple top-1 aggregations that can be implemented using a simplified version of this code.

To get started, go through the steps described in my previous article. This will result in an App.java class, that we can flesh out.

<br>package nl.amis.streams.countries;</p> <p>import nl.amis.streams.JsonPOJOSerializer;<br>import nl.amis.streams.JsonPOJODeserializer;</p> <p>// generic Java imports<br>import java.util.Properties;<br>import java.util.HashMap;<br>import java.util.Map;<br>import java.util.Arrays;<br>// Kafka imports<br>import org.apache.kafka.common.serialization.Serde;<br>import org.apache.kafka.common.serialization.Serdes;<br>import org.apache.kafka.common.serialization.Serializer;<br>import org.apache.kafka.common.serialization.Deserializer;<br>// Kafka Streams related imports<br>import org.apache.kafka.streams.StreamsConfig;<br>import org.apache.kafka.streams.KafkaStreams;<br>import org.apache.kafka.streams.kstream.KStream;<br>import org.apache.kafka.streams.kstream.KTable;<br>import org.apache.kafka.streams.kstream.KStreamBuilder;<br>import org.apache.kafka.streams.processor.WallclockTimestampExtractor;</p> <p>import org.apache.kafka.streams.kstream.Window;<br>import org.apache.kafka.streams.kstream.Windowed;<br>import org.apache.kafka.streams.kstream.Windows;<br>import org.apache.kafka.streams.kstream.TimeWindows;</p> <p>public class App {<br>static public class CountryMessage {<br>/* the JSON messages produced to the countries Topic have this structure:<br>{ "name" : "The Netherlands"<br>, "code" : "NL<br>, "continent" : "Europe"<br>, "population" : 17281811<br>, "size" : 42001<br>};<br><br>this class needs to have at least the corresponding fields to deserialize the JSON messages into<br>*/</p> <p>public String code;<br>public String name;<br>public int population;<br>public int size;<br>public String continent;<br>}</p> <p>static public class CountryTop3 {</p> <p>public CountryMessage[] nrs = new CountryMessage[4] ;<br>public CountryTop3() {}<br>}</p> <p>private static final String APP_ID = "countries-top3-kafka-streaming-analysis-app";</p> <p>public static void main(String[] args) {<br>System.out.println("Kafka Streams Top 3 Demonstration");</p> <p>// Create an instance of StreamsConfig from the Properties instance<br>StreamsConfig config = new StreamsConfig(getProperties());<br>final Serde &lt; String &gt; stringSerde = Serdes.String();<br>final Serde &lt; Long &gt; longSerde = Serdes.Long();</p> <p>// define countryMessageSerde<br>Map &lt; String, Object &gt; serdeProps = new HashMap &lt; String, Object &gt; ();<br>final Serializer &lt; CountryMessage &gt; countryMessageSerializer = new JsonPOJOSerializer &lt; &gt; ();<br>serdeProps.put("JsonPOJOClass", CountryMessage.class);<br>countryMessageSerializer.configure(serdeProps, false);</p> <p>final Deserializer &lt; CountryMessage &gt; countryMessageDeserializer = new JsonPOJODeserializer &lt; &gt; ();<br>serdeProps.put("JsonPOJOClass", CountryMessage.class);<br>countryMessageDeserializer.configure(serdeProps, false);<br>final Serde &lt; CountryMessage &gt; countryMessageSerde = Serdes.serdeFrom(countryMessageSerializer, countryMessageDeserializer);</p> <p>// define countryTop3Serde<br>serdeProps = new HashMap&lt;String, Object&gt;();<br>final Serializer&lt;CountryTop3&gt; countryTop3Serializer = new JsonPOJOSerializer&lt;&gt;();<br>serdeProps.put("JsonPOJOClass", CountryTop3.class);<br>countryTop3Serializer.configure(serdeProps, false);</p> <p>final Deserializer&lt;CountryTop3&gt; countryTop3Deserializer = new JsonPOJODeserializer&lt;&gt;();<br>serdeProps.put("JsonPOJOClass", CountryTop3.class);<br>countryTop3Deserializer.configure(serdeProps, false);<br>final Serde&lt;CountryTop3&gt; countryTop3Serde = Serdes.serdeFrom(countryTop3Serializer, countryTop3Deserializer );</p> <p>// building Kafka Streams Model<br>KStreamBuilder kStreamBuilder = new KStreamBuilder();<br>// the source of the streaming analysis is the topic with country messages<br>KStream&lt;String, CountryMessage&gt; countriesStream = <br>kStreamBuilder.stream(stringSerde, countryMessageSerde, "countries");</p> <p>// A hopping time window with a size of 5 minutes and an advance interval of 1 minute.<br>// The window's name -- the string parameter -- is used to e.g. name the backing state store.<br>long windowSizeMs = 5 * 60 * 1000L;<br>long advanceMs = 1 * 60 * 1000L;<br>TimeWindows.of("hopping-window-example", windowSizeMs).advanceBy(advanceMs);</p> <p>// THIS IS THE CORE OF THE STREAMING ANALYTICS:<br>// top 3 largest countries per continent, published to topic Top3CountrySizePerContinent<br>KTable&lt;String,CountryTop3&gt; top3PerContinent = countriesStream<br>// the dimension for aggregation is continent; assign the continent as the key for each message<br>.selectKey((k, country) -&gt; country.continent)<br>// for each key value (every continent in the stream) perform an aggregation<br>.aggregateByKey( <br>// first initialize a new CountryTop3 object, initially empty<br>CountryTop3::new<br>, // for each country in the continent, invoke the aggregator, passing in the continent, the country element and the CountryTop3 object for the continent <br>(continent, countryMsg, top3) -&gt; {<br>// add the new country as the last element in the nrs array<br>top3.nrs[3]=countryMsg;<br>// sort the array by country size, largest first<br>Arrays.sort(<br>top3.nrs, (a, b) -&gt; {<br>// in the initial cycles, not all nrs element contain a CountryMessage object <br>if (a==null) return 1;<br>if (b==null) return -1;<br>// with two proper CountryMessage objects, do the normal comparison<br>return Integer.compare(b.size, a.size);<br>}<br>);<br>// lose nr 4, only top 3 is relevant<br>top3.nrs[3]=null;<br>return (top3);<br>}<br>, stringSerde, countryTop3Serde<br>, "Top3LargestCountriesPerContinent"<br>);<br>// publish the Top3 messages to Kafka Topic Top3CountrySizePerContinent <br>top3PerContinent.to(stringSerde, countryTop3Serde, "Top3CountrySizePerContinent");</p> <p>// prepare Top3 messages to be printed to the console<br>top3PerContinent.&lt;String&gt;mapValues((top3) -&gt; {<br>String rank = " 1. "+top3.nrs[0].name+" - "+top3.nrs[0].size <br>+ ((top3.nrs[1]!=null)? ", 2. "+top3.nrs[1].name+" - "+top3.nrs[1].size:"")<br>+ ((top3.nrs[2]!=null) ? ", 3. "+top3.nrs[2].name+" - "+top3.nrs[2].size:"")<br>; <br>return "List for "+ top3.nrs[0].continent +rank;<br>} <br>)<br>.print(stringSerde,stringSerde);</p> <p>System.out.println("Starting Kafka Streams Countries Example");<br>KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);<br>kafkaStreams.start();<br>System.out.println("Now started CountriesStreams Example");<br>}</p> <p>private static Properties getProperties() {<br>Properties settings = new Properties();<br>// Set a few key parameters<br>settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);<br>// Kafka bootstrap server (broker to talk to); ubuntu is the host name for my VM running Kafka, port 9092 is where the (single) broker listens <br>settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ubuntu:9092");<br>// Apache ZooKeeper instance keeping watch over the Kafka cluster; ubuntu is the host name for my VM running Kafka, port 2181 is where the ZooKeeper listens <br>settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "ubuntu:2181");<br>// default serdes for serialzing and deserializing key and value from and to streams in case no specific Serde is specified<br>settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());<br>settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());<br>settings.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");<br>// to work around exception Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: Invalid timestamp -1<br>// at org.apache.kafka.clients.producer.ProducerRecord.&lt;init&gt;(ProducerRecord.java:60)<br>// see: https://groups.google.com/forum/#!topic/confluent-platform/5oT0GRztPBo<br>settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);<br>return settings;<br>}</p> <p>}<br>

Some special attention for:

  • static class CountryTop3 – custom class to hold the actual top3 for a continent; objects based on this class are passed around in the aggregator, and are produced to the output stream & topic
    image
  • countryTop3Serde – defined for serializing CountryTop3 object to Kafka Topic, using the JsonPOJOSerializer that can translate a Java POJO to a JSON representation (that is subsequently serialized as String to the Kafka Topic)
  • aggregator implementation in lambda that performs the actual aggregation – the operatoin aggregateByKey (aggregateByKey) is invoked with an Initializer(Initializer) that returns the initial instance of the CountryTop3 object (per continent) on which the aggregate will be built, an Aggregator (Aggregator) that receives the continent, the CountryTop3 object and the next CountryMessage and upgrades the CountryTop3 object to include the new CountryMessage, the Serdes (serializer/deserializer) for the key and the value and a String that is the name of the resulting KTable.image
  • mapValues in order to create printable strings from the CountryTop3 object – the Lambda expression used in the call to mapValues gets a CountryTop3 object as input – the value in the top3PerContinent KTable – and maps it to a String. As a result, the KTable <String,CountryTop3> is mapped to a KTable<String,String> that is streamed to the print operation, using the stringSerde for serializing the two String values.image

 

 

To run the application, go through these four command line steps steps

  • (in an empty directory:)
    git clone https://github.com/lucasjellema/kafka-streams-running-topN
  • (navigate to directory kafka-streams-running-topN\Kafka-Streams-Country-TopN)
    mvn package
  • (in the same directory)
    mvn install dependency:copy-dependencies
  • (in the same directory)
    java -cp target/Kafka-Streams-Country-TopN-1.0-SNAPSHOT.jar;target/dependency/* nl.amis.streams.countries.App

 

 

image

 

Here is a screenshot of the Node.JS application busing producing country messages:

image

And here is some of the output produced by the Kafka Streams application:

image

Note how Mayotte enters at position one for the African continent, only to be quickly relegated by first Mozambique and then Namibia, only to disappear from the running top 3 when the message for Niger is consumed in the stream.

You should also know that instead of simply pushing every change to the destination topic, we can using timing control – to calculate aggregates over a time slice and or produce outcomes only once every so often. I will demonstrate this is in a subsequent article.

Kafka Tool shows us the topics involved in this article:

image

  • countries is the source, produced to by the Node.js application
  • Top3CountrySizePerContinent is the destination topic for the Kafka Streams application, to which the running Top 3 messages are produced
  • countries-topn-streaming-analysis-app-Top3LargestCountriesPerContinent-changelog is a Topic created by Kafka Streams on the fly as store for intermediate results; the name of this Topic is derived from the (intermediate) KTable create in the streaming application.

By routing the KTable to a Topic, all change events on the table are produced to the Topic. What I would have liked to be able to do is have only the latest message for each key – in this case the most recent top 3 for each continent – on the Topic. That is not what Kafka Streams does for me, not even when I am producing a KTable as opposed to a KStream. One thing I can do in this case is enable Log Compaction for the topic – although that is more like a hint to the Kafka engine than a strict instruction for removing older messages on the Topic for a key.

Note: the Kafka Streaming application makes use of RocksDB – a simple local Java client database – to hold intermediate results. RocksDB stores data locally in a directory that can be configured. During development, when you run the same analysis on the same set of test data, over and over again, you may get unexpected results, because RocksDB continues with the data it has retained from previous runs. It may be wise to delete the RocksDB local data repository regularly, by just deleting the directory:

image
Resources

An interesting resource is the Kafka Streams example KafkaMusicExample.java on generating a running Top 5 of all songs being played.

A good read is the article Processing Tweets with Kafka Streams https://www.madewithtea.com/processing-tweets-with-kafka-streams.html

The post Apache Kafka Streams – Running Top-N Aggregation grouped by Dimension – from and to Kafka Topic appeared first on AMIS Oracle and Java Blog.

Getting Started with Kafka Streams – building a streaming analytics Java application against a Kafka Topic

Sat, 2017-02-11 02:52

Kafka Streams is a light weight Java library for creating advanced streaming applications on top of Apache Kafka Topics. Kafka Streams provides easy to use constructs that allow quick and almost declarative composition by Java developers of streaming pipelines that do running aggregates, real time filtering, time windows, joining of streams. Results from the streaming analysis can easily be published to Kafka Topic or to external destinations. Despite the close integration with Kafka and the many out of the box library elements, the application created with Kafka Streams is just a Java application, that can be deployed and run wherever Java applications can run (which is of course virtually anywhere).

In this article I will show you my first steps with Kafka Streams. I will create a simple Kafka Streams application that streams messages from a Kafka Topic, processes them after dimensioning them (grouping by a specific key) and then keeps a running count. The running count is produced to a second Kafka Topic (as well as written to the console). Anyone interested in the outcome of this streaming analysis can consume this topic – without any dependency on the Kafka Streams based logic.

This next figure shows the application. Country messages – simple JSON messages that describe a country with properties such as name, continent, population and size – are produced to a Kafka Topic. (this is done from a simple Node.JS application that reads the data from a CSV file and publishes the records to a Kafka Topic- this application is described in this article: NodeJS – Publish messages to Apache Kafka Topic with random delays to generate sample events based on records in CSV file). The Kafka Streams application consists of a single Java Class that creates a stream from the Kafka Topic. Elements in the stream are assigned a key – the continent – and are then counted-by-key. The result (the running count of countries per continent) is routed to an outbound stream that produces messages to a second Kafka Topic.

image

 

My starting point in this article is:

  • a running Kafka Cluster somewhere on a server (or as is actually the case, in a VM running on my laptop)
  • locally installed Java 8 (1.8.0_72)
  • locally installed Maven (3.2.5)
  • (optional) locally installed Node.js (6.9.4)

image

You will find all sources discussed in this article in this GitHub Repo: https://github.com/lucasjellema/kafka-streams-getting-started .

The steps discussed below:

  • Initialize a new Maven project
  • Add dependencies on Kafka Streams to Maven pom-file
  • Implement Java Application:
    • create KStream using StreamBuilder for Kafka Topic countries
    • selectKey for messages in the KStream: continent
    • countByKey the messages in the KStream
    • route the resulting running count messages to a stream for the destination Kafka Topic RunningCountryCountPerContinent
    • print the running count messages to the console
  • Compile the application and build a JAR file – using mvn package (note: we will do a little tweaking on the exact dependencies)
  • Download all required JAR files needed for running the Kafka Streams application – using mvn install dependency:copy-dependencies
  • optional: Run NodeJS application to produce country messages to Kafka Topic countries Alternatively: manually publish country messages, create another application to publish country messages or use Kafka Connect to bring country messages across to the countries Topic
  • Run the Java application using the Maven generated JAR file and all JARs downloaded by Maven; this will produce messages on the Kafka Topic (which we can inspect, for example using Kafka Tool) and print messages to the console (that are even easier to inspect).

If you want to inspect and perhaps edit the code but not necessarily create the application from scratch, you can clone the GitHub Repo: git clone https://github.com/lucasjellema/kafka-streams-getting-started.

My starting point is the local file system, freshly cloned from a nearly empty GitHub repository. I have included the NodeJS application that will produce the country messages to the Kafka topic:

SNAGHTML34420f

1. Initialize a new Maven project

 

mvn archetype:generate -DgroupId=nl.amis.streams.countries -DartifactId=Kafka-Streams-Country-Counter -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

 

image

The result:

image

 

2. Add dependency on Kafka Streams to the Maven pom-file

The Maven Repo identifier is found from Maven Central: https://search.maven.org

image

  <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-streams</artifactId>
     <version>0.10.0.0</version>
   </dependency>

image
 

3. Implement Java Application
  • create KStream using StreamBuilder for Kafka Topic countries
  • selectKey for messages in the KStream: continent
  • countByKey the messages in the KStream
  • route the resulting running count messages to a stream for the destination Kafka Topic RunningCountryCountPerContinent
  • print the running count messages to the console

image

Or in detail:

 

public class App {
    static public class CountryMessage {
        /* the JSON messages produced to the countries Topic have this structure:
         { "name" : "The Netherlands"
         , "code" : "NL
         , "continent" : "Europe"
         , "population" : 17281811
         , "size" : 42001
         };
  
        this class needs to have at least the corresponding fields to deserialize the JSON messages into
        */

        public String code;
        public String name;
        public int population;
        public int size;
        public String continent;
    }

    private static final String APP_ID = "countries-streaming-analysis-app";

    public static void main(String[] args) {
        System.out.println("Kafka Streams Demonstration");

        // Create an instance of StreamsConfig from the Properties instance
        StreamsConfig config = new StreamsConfig(getProperties());
        final Serde < String > stringSerde = Serdes.String();
        final Serde < Long > longSerde = Serdes.Long();

        // define countryMessageSerde
        Map < String, Object > serdeProps = new HashMap < > ();
        final Serializer < CountryMessage > countryMessageSerializer = new JsonPOJOSerializer < > ();
        serdeProps.put("JsonPOJOClass", CountryMessage.class);
        countryMessageSerializer.configure(serdeProps, false);

        final Deserializer < CountryMessage > countryMessageDeserializer = new JsonPOJODeserializer < > ();
        serdeProps.put("JsonPOJOClass", CountryMessage.class);
        countryMessageDeserializer.configure(serdeProps, false);
        final Serde < CountryMessage > countryMessageSerde = Serdes.serdeFrom(countryMessageSerializer, countryMessageDeserializer);

        // building Kafka Streams Model
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        // the source of the streaming analysis is the topic with country messages
        KStream<String, CountryMessage> countriesStream = 
                                       kStreamBuilder.stream(stringSerde, countryMessageSerde, "countries");

        // THIS IS THE CORE OF THE STREAMING ANALYTICS:
        // running count of countries per continent, published in topic RunningCountryCountPerContinent
        KTable<String,Long> runningCountriesCountPerContinent = countriesStream
                                                                 .selectKey((k, country) -> country.continent)
                                                                 .countByKey("Counts")
                                                                 ;
        runningCountriesCountPerContinent.to(stringSerde, longSerde,  "RunningCountryCountPerContinent");
        runningCountriesCountPerContinent.print(stringSerde, longSerde);



        System.out.println("Starting Kafka Streams Countries Example");
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
        kafkaStreams.start();
        System.out.println("Now started CountriesStreams Example");
    }

    private static Properties getProperties() {
        Properties settings = new Properties();
        // Set a few key parameters
        settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
        // Kafka bootstrap server (broker to talk to); ubuntu is the host name for my VM running Kafka, port 9092 is where the (single) broker listens 
        settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ubuntu:9092");
        // Apache ZooKeeper instance keeping watch over the Kafka cluster; ubuntu is the host name for my VM running Kafka, port 2181 is where the ZooKeeper listens 
        settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "ubuntu:2181");
        // default serdes for serialzing and deserializing key and value from and to streams in case no specific Serde is specified
        settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        settings.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
        // to work around exception Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: Invalid timestamp -1
        // at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)
        // see: https://groups.google.com/forum/#!topic/confluent-platform/5oT0GRztPBo
        settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
        return settings;
    }

Note: the Serde is an object that carries a serializer and a deserializer for a specific data type, used to serialize and deserialize keys and values into and from messages on a Kafka topic. Whenever our Java client consumes or produces elements, the Serde for those elements has to be provided. In this case, I have crafted the countryMessageSerde for the CountryMessage Java Class that is instantiated from a JSON message that is the value of consumed Kafka messages. This Serde carries a serializer and deserializer based on the JsonPOJODeserializer and JsonPOJOSerializer that are generic JSON to Java mappers, using the Jackson library for doing so.

 

4. Compile the application and build a JAR file – using mvn package

(note: we will later on do a little tweaking on the exact dependencies and set the correct Java version)

 

Add the following plugin in the Maven pom file, to ensure that compilation is done for Java version 1.8 (8.0); this is required for Lambda expressions (Java 8) and use of Generics (Java 7)

...
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.1</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

 

 

5. Download all required JAR files needed for running the Kafka Streams application

using mvn install dependency:copy-dependencies

mvn install dependency:copy-dependencies

image

All JAR files that follow from the dependencies defined in the pom.xml file are downloaded to the directory Kafka-Streams-Country-Counter\target\dependency

image

 

6. Produce Country Messages to Kafka Topic

optional: Run NodeJS application to produce country messages to Kafka Topic countries Alternatively: manually publish country messages, create another application to publish country messages or use Kafka Connect to bring country messages across to the countries Topic

image

7. Run the Java application using the Maven generated JAR file and all JARs downloaded by Maven

java -cp target/Kafka-Streams-Country-Counter-1.0-SNAPSHOT.jar;target/dependency/* nl.amis.streams.countries.App

(note: on Linux, the semi colon separating the jar files should be a colon)

 

I ran into several exceptions at this point. I will list them and show the resolutions:

Exception in thread “StreamThread-1” org.apache.kafka.streams.errors.StreamsException: Failed to rebalance
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:299)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)

        … 1 more
Caused by: java.io.FileNotFoundException: C:\tmp\kafka-streams\countries-streaming-analysis-app\0_0\.lock (The system cannot find the path specified)
        at java.io.RandomAccessFile.open0(Native Method)

 

Add the following line in the Java code getProperties() method:

settings.put(StreamsConfig.STATE_DIR_CONFIG , “C:\\tmp”); // on Windows

or

settings.put(StreamsConfig.STATE_DIR_CONFIG , “/tmp”); // on Linux

Exception in thread “StreamThread-1” java.lang.IllegalArgumentException: Invalid timestamp -1
        at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)

See: https://groups.google.com/forum/#!topic/confluent-platform/5oT0GRztPBo for details on this exception.

Add following line in Java code, method getProperties():

settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

    and the associated import:

    import org.apache.kafka.streams.processor.WallclockTimestampExtractor;

    Exception in thread “StreamThread-1” java.lang.ExceptionInInitializerError
            at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:47)
            at org.rocksdb.RocksDB.<clinit>(RocksDB.java:23)
            at org.rocksdb.Options.<clinit>(Options.java:21)
            at org.apache.kafka.streams.state.internals.RocksDBStore.<init>(RocksDBStore.java:126)

    Caused by: java.lang.UnsupportedOperationException
            at org.rocksdb.util.Environment.getJniLibraryName(Environment.java:40)
            at org.rocksdb.NativeLibraryLoader.<clinit>(NativeLibraryLoader.java:19)

     

    This exception can occur on Windows and is caused by the fact that the version of RocksDB that Kafka Streams 0.10.0.0 has a dependency on does not include the required Windows DLL; RockDB 4.9 does include that DLL.

    Add dependency to Maven pom.xml:

        <!-- https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni -->
       <dependency>
          <groupId>org.rocksdb</groupId>
          <artifactId>rocksdbjni</artifactId>
          <version>4.9.0</version>
      </dependency>
    

     

    8. Run the Kafka Streams application

    Run (again)

    • mvn package
    • mvn install dependency:copy-dependencies
    • java -cp target/Kafka-Streams-Country-Counter-1.0-SNAPSHOT.jar;target/dependency/* nl.amis.streams.countries.App

     

    And now, finally, the running count is produced, both to the console:

    image

    and to the Kafka Topic, here seen in Kafka Tool:

    image

    The messages have a String type Key and a Long type value.

    Note: the topic is in the blue rectangle – countries-streaming-analysis-app-Counts-changelog – is created by the Kafka Streams library  as an intermediate change log for the running count. Instead of keeping the temporary results [only]in memory, they are produced to a Kafka Topic as well.

     

    Resources

    On Line Java Beautifier – http://www.tutorialspoint.com/online_java_formatter.htm

    The post Getting Started with Kafka Streams – building a streaming analytics Java application against a Kafka Topic appeared first on AMIS Oracle and Java Blog.

    Workshop Apache Kafka – presentation and hands on labs for getting started

    Fri, 2017-02-10 00:09

    The AMIS SIG session on Apache Kafka (9th February 2017) took 25 participants by the hand on a tour of Apache Kafka. Through presentations, demonstrations and a hands-on workshop, we provided a feet-hitting-the-ground-running introduction to Apache Kafka and Kafka Streams as bonus. Responsible for this workshop are Maarten Smeets and Lucas Jellema.

    All materials for the workshop are freely available. The sources and hands-on lab are available in a GitHub Repo: https://github.com/MaartenSmeets/kafka-workshop 

    The workshop discusses Hello World with Kafka, interacting with Kafka from Java and Node.JS, Kafka REST proxy, Kafka Streams, under the hood: partitions, brokers, replication and Kafka integration with Oracle Service Bus and Oracle Stream Analytics.

    image

    The presentation is also available from SlideShare: http://www.slideshare.net/lucasjellema/amis-sig-introducing-apache-kafka-scalable-reliable-event-bus-message-queue

    The post Workshop Apache Kafka – presentation and hands on labs for getting started appeared first on AMIS Oracle and Java Blog.

    Download all directly and indirectly required JAR files using Maven install dependency:copy-dependencies

    Thu, 2017-02-09 00:33

    My challenge is simple: I am creating a small Java application – single class with main method – that has many direct and indirect dependencies. In order to run my simple class locally I need to:

    • code the Java Class
    • compile the class
    • run the class

    In order to compile the class, all directly referenced classes from supporting libraries should be available. To run the class, all indirectly invoked classes should also be available. That means that in addition to the .class file that is result of compiling my Java code, I need a large number of JAR-files.

    Maven is a great mechanism for describing the dependencies of a project. With a few simple XML elements, I can indicate which libraries my application has a direct dependency on. The Maven pom.xml file is where these dependencies are described. Maven uses these dependencies during compilation – to have all direct dependent classes available for the compiler.

    In order to help out with all run time dependencies, Maven also can download all jar-files for the direct and even the indirect dependencies. Take the dependencies in this pom.xml file (for a Java application that will work with Kafka Streams):

     

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>nl.amis.streams.countries</groupId>
      <artifactId>Country-Events-Analyzer</artifactId>
      <packaging>jar</packaging>
      <version>1.0-SNAPSHOT</version>
      <name>Country-Events-Analyzer</name>
      <url>http://maven.apache.org</url>
      <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->  
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>0.10.0.0</version>    
        </dependency>
        <dependency>    
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.0</version>    
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
        	<artifactId>jackson-databind</artifactId>
        	<version>2.7.4</version>
        </dependency>
         <dependency>
              <groupId>junit</groupId>
              <artifactId>junit</artifactId>
              <version>3.8.1</version>
              <scope>test</scope>
         </dependency>
         <dependency>
            <groupId>org.rocksdb</groupId>
            <artifactId>rocksdbjni</artifactId>
            <version>4.9.0</version>
        </dependency>
      </dependencies>
      <build>
      <plugins>
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.1</version>
        <configuration>
            <source>1.8</source>
            <target>1.8</target>
        </configuration>
    </plugin>
    </plugins>
    </build>
    </project>
    
    

    The number of JAR files required to eventually run the generated class is substantially. To find all these JAR-files manually is not simple: it may not be so simple to determine which files are required, the files may not be easy to locate and the indirect dependencies (stemming from the JAR files that the application directly depends on) are almost impossible to determine.

    Using a simple Maven instruction, all JAR files are gathered and copied to a designated directory. Before the operation, here is the application. Note that the target directory is empty.

    image

    The statement to use is:

    mvn install dependency:copy-dependencies

    This will instruct Maven to analyze the pom.xml file, find the direct dependencies, find the associated JAR files, determine the indirect dependencies for each of these direct dependencies and process them similarly and recursively.

    image

    after some dozens of seconds:

    image

     

    The JAR files are downloaded to the target/dependency directory:

    SNAGHTML56d4f25

     

    I can now run my simple application using this command line command, that adds all JAR files to the classpath for the JVM:

    java -cp target/Country-Events-Analyzer-1.0-SNAPSHOT.jar;target/dependency/* nl.amis.streams.countries.App

    Note: on Linux, the semi colon should be a colon: java -cp target/Country-Events-Analyzer-1.0-SNAPSHOT.jar:target/dependency/* nl.amis.streams.countries.App

    Note: the maven dependencies for specific projects and libraries can be explored in MVNRepository , such as https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams/0.10.0.0 for Kafka Streams.

    The post Download all directly and indirectly required JAR files using Maven install dependency:copy-dependencies appeared first on AMIS Oracle and Java Blog.

    NodeJS – Publish messages to Apache Kafka Topic with random delays to generate sample events based on records in CSV file

    Wed, 2017-02-08 23:59

    In a recent article I described how to implement a simple Node.JS program that reads and processes records from a delimiter separated file. That is  stepping stone on the way to my real goal: publish a load of messages on a Kafka Topic, based on records in a file, and semi-randomly spread over time.

    In this article I will use the stepping stone and extend it:

    • read all records from CSV file into a memory array
    • create a Kafka Client and Producer using Node module kafka-node
    • process one record at a time, and when done schedule the next cycle using setTimeOut with a random delay
    • turn each parsed record into an object and publish the JSON stringified representation to the Kafka Topic

    image

    The steps:

    1. npm init kafka-node-countries

    2. npm install csv-parse –save

    3. npm install kafka-node –save

    4. Implement KafkaCountryProducer.js

     

    /*
    This program reads and parses all lines from csv files countries2.csv into an array (countriesArray) of arrays; each nested array represents a country.
    The initial file read is synchronous. The country records are kept in memory.
    After the the initial read is performed, a function is invoked to publish a message to Kafka for the first country in the array. This function then uses a time out with a random delay 
    to schedule itself to process the next country record in the same way. Depending on how the delays pan out, this program will publish country messages to Kafka every 3 seconds for about 10 minutes.
    */
    
    var fs = require('fs');
    var parse = require('csv-parse');
    
    // Kafka configuration
    var kafka = require('kafka-node')
    var Producer = kafka.Producer
    // instantiate client with as connectstring host:port for  the ZooKeeper for the Kafka cluster
    var client = new kafka.Client("ubuntu:2181/")
    
    // name of the topic to produce to
    var countriesTopic = "countries";
    
        KeyedMessage = kafka.KeyedMessage,
        producer = new Producer(client),
        km = new KeyedMessage('key', 'message'),
        countryProducerReady = false ;
    
    producer.on('ready', function () {
        console.log("Producer for countries is ready");
        countryProducerReady = true;
    });
     
    producer.on('error', function (err) {
      console.error("Problem with producing Kafka message "+err);
    })
    
    
    var inputFile='countries2.csv';
    var averageDelay = 3000;  // in miliseconds
    var spreadInDelay = 2000; // in miliseconds
    
    var countriesArray ;
    
    var parser = parse({delimiter: ';'}, function (err, data) {
        countriesArray = data;
        // when all countries are available,then process the first one
        // note: array element at index 0 contains the row of headers that we should skip
        handleCountry(1);
    });
    
    // read the inputFile, feed the contents to the parser
    fs.createReadStream(inputFile).pipe(parser);
    
    // handle the current coountry record
    function handleCountry( currentCountry) {   
        var line = countriesArray[currentCountry];
        var country = { "name" : line[0]
                      , "code" : line[1]
                      , "continent" : line[2]
                      , "population" : line[4]
                      , "size" : line[5]
                      };
         console.log(JSON.stringify(country));
         // produce country message to Kafka
         produceCountryMessage(country)
         // schedule this function to process next country after a random delay of between averageDelay plus or minus spreadInDelay )
         var delay = averageDelay + (Math.random() -0.5) * spreadInDelay;
         //note: use bind to pass in the value for the input parameter currentCountry     
         setTimeout(handleCountry.bind(null, currentCountry+1), delay);             
    }//handleCountry
    
    function produceCountryMessage(country) {
        KeyedMessage = kafka.KeyedMessage,
        countryKM = new KeyedMessage(country.code, JSON.stringify(country)),
        payloads = [
            { topic: countriesTopic, messages: countryKM, partition: 0 },
        ];
        if (countryProducerReady) {
        producer.send(payloads, function (err, data) {
            console.log(data);
        });
        } else {
            // the exception handling can be improved, for example schedule this message to be tried again later on
            console.error("sorry, CountryProducer is not ready yet, failed to produce message to Kafka.");
        }
    
    }//produceCountryMessage
    

    5. Run node KafkaCountryProducer.js

    The post NodeJS – Publish messages to Apache Kafka Topic with random delays to generate sample events based on records in CSV file appeared first on AMIS Oracle and Java Blog.

    NodeJS – reading and processing a delimiter separated file (csv)

    Wed, 2017-02-08 23:34

    Frequently, there is a need to read data from a file, process it and route it onwards. In my case, the objective was to produce messages on a Kafka Topic. However, regardless of the objective, the basic steps of reading the file and processing its contents are required often. In this article I show the very basic steps with Node.js and and the Node module csv-parse.

    1. npm init process-csv

    Enter a small number of details in the command line dialog. Shown in blue:

    image

    2. npm install csv-parse -save

    This will install Node module csv-parse. This module provides processing of delimiter separated files.

    image

    This also extends the generated file package.json with a reference to csv-parse:

    image

    3. Implement file processFile.js

    The logic to read records from a csv file and do something (write to console) with each record is very straightforward. In this example, I will read data from the file countries2.csv, a file with records for all countries in the world (courtesy of https://restcountries.eu/)

    image

    The fields are semi colon separated, the records are each on a new line.

     

    /*
    This program reads and parses all lines from csv files countries2.csv into an array (countriesArray) of arrays; each nested array represents a country.
    The initial file read is synchronous. The country records are kept in memory.
    */
    
    var fs = require('fs');
    var parse = require('csv-parse');
    
    var inputFile='countries2.csv';
    console.log("Processing Countries file");
    
    var parser = parse({delimiter: ';'}, function (err, data) {
        // when all countries are available,then process them
        // note: array element at index 0 contains the row of headers that we should skip
        data.forEach(function(line) {
          // create country object out of parsed fields
          var country = { "name" : line[0]
                        , "code" : line[1]
                        , "continent" : line[2]
                        , "population" : line[4]
                        , "size" : line[5]
                        };
         console.log(JSON.stringify(country));
        });    
    });
    
    // read the inputFile, feed the contents to the parser
    fs.createReadStream(inputFile).pipe(parser);
    

     

    4. Run file with node procoessFile.js:

    image

    The post NodeJS – reading and processing a delimiter separated file (csv) appeared first on AMIS Oracle and Java Blog.

    Oracle Service Bus: Produce messages to a Kafka topic

    Mon, 2017-02-06 04:04

    Oracle Service Bus is a powerful tool to provide features like transformation, throttling, virtualization of messages coming from different sources. There is a (recently opensourced!) Kafka transport available for Oracle Service Bus (see here). Oracle Service Bus can thus be used to do all kinds of interesting things to messages coming from Kafka topics. You can then produce altered messages to other Kafka topics and create a decoupled processing chain. In this blog post I provide an example on how to use Oracle Service Bus to produce messages to a Kafka topic.

    Messages from Service Bus to Kafka

    First perform the steps as described here to setup the Service Bus with the Kafka transport. Also make sure you have a Kafka broker running.

    Next create a new Business Service (File, New, Business Service). It is not visible in the component palette since it is a custom transport. Next use transport Kafka.


    In the Type screen be sure to select Text as request message and None as response message.


    Specify a Kafka bootstrap broker.


    The body needs to be of type {http://schemas.xmlsoap.org/soap/envelope/}Body. If you send plain text as the body to the Kafka transport, you will get the below error message:

    <Error> <oracle.osb.pipeline.kernel.router> <ubuntu> <DefaultServer> <[STUCK] ExecuteThread: '22' for queue: 'weblogic.kernel.Default (self-tuning)'> <<anonymous>> <> <43b720fd-2b5a-4c93-073-298db3e92689-00000132> <1486368879482> <[severity-value: 8] [rid: 0] [partition-id: 0] [partition-name: DOMAIN] > <OSB-382191> <SBProject/ProxyServicePipeline: Unhandled error caught by system-level error handler: com.bea.wli.sb.pipeline.PipelineException: OSB Assign action failed updating variable "body": [OSB-395105]The TokenIterator does not correspond to a single XmlObject value
    

    If you send XML as the body of the message going to the transport but not an explicit SOAP body, you will get errors in the server log like below:

    <Error> <oracle.osb.pipeline.kernel.router> <ubuntu> <DefaultServer> <[STUCK] ExecuteThread: '22' for queue: 'weblogic.kernel.Default (self-tuning)'> <<anonymous>> <> <43b720fd-2b5a-4c93-a073-298db3e92689-00000132> <1486368987002> <[severity-value: 8] [rid: 0] [partition-id: 0] [partition-name: DOMAIN] > <OSB-382191> <SBProject/ProxyServicePipeline: Unhandled error caught by system-level error handler: com.bea.wli.sb.context.BindingLayerException: Failed to set the value of context variable "body". Value must be an instance of {http://schemas.xmlsoap.org/soap/envelope}Body.
    

    As you can see, this causes stuck threads. In order to get a {http://schemas.xmlsoap.org/soap/envelope/}Body you can for example use an Assign activity. In this case I’m replacing text in the input body and assign it to the output body. I’m using <ns:Body xmlns:ns=’http://schemas.xmlsoap.org/soap/envelope/’>{fn:replace($body,’Trump’,’Clinton’)}</ns:Body>. This replaces Trump with Clinton.


    When you check the output with a tool like for example KafkaTool you can see the SOAP body is not propagated to the Kafka topic.

    Finally

    Oracle Service Bus processes individual messages. If you want to aggregate data or perform analytics on several messages, you can consider using Oracle Stream Analytics (OSA). It also has pattern recognition and several other interesting features. It is however not very suitable to split up messages or perform more complicated actions on individual messages such as transformations. For such a use-case, use Oracle Service Bus.

    The post Oracle Service Bus: Produce messages to a Kafka topic appeared first on AMIS Oracle and Java Blog.

    How About Oracle Database 12c Threaded_Execution

    Sun, 2017-02-05 13:53

    THREADED_EXECUTION
    Threaded_Execution is an Oracle Database 12c feature aiming to reduce the number of Oracle processes on LINUX. After setting parameter THREADED_EXECUTION on TRUE and a database bounce, most of the background processes are threads within just 6 Oracle processes, where more than 60 processes existed before the bounce. And if you want to apply this to client processes also, just add DEDICATED_THROUGH_BROKER_LISTENER = ON to the listener.ora and reload.

    Especially within a consolidated environment and in coping with applications that just can’t restrict their connection pool activity and overload the database with sessions, this feature is very welcome. LINUX is better off with less Oracle processes, the communication between threads within a process is faster and more efficient than between processes, and logins | logoffs of client sessions as threads instead of processes are faster and less stressful for Oracle. Just to make sure you don’t mistake this for the shared server option… every session is still a dedicated session, and not a shared one. What follows is a summary of issues I encountered when I implemented this feature within databases on an ODA X5-2.

    SEPS
    Threaded_execution is a medicine that comes with an annoying side effect… database login with OS authentication is no longer possible. Quite some issue because most of my scripts use the “/ as sysdba” login, and from now on I am forced to use “sys/password as sysdba”? Well, this presented me with more of an opportunity than a problem, because now I could implement SEPS ( Secure External Password Store ), an Oracle wallet in which to keep unique combinations of TNS alias, username and password. I don’t find this a particularly user-friendly Oracle tool, but it does the job and enables a password less login with “/@TNS-alias as sysdba”. If you want some more information on SEPS, see here. It’s main aim is to prevent hard-coded passwords in code and scripts.

    JDBC THIN
    With threaded_execution also enabled for client sessions, some jdbc thin applications were not able to login.
    Cause: Bug 19929111 : 11.1 JDBC THIN CAN’T CONNECT TO A 12C D/B WITH THREADED_EXECUTION=TRUE
    Solved by upgrading the antique JDBC driver (11.1.0.7) to 11.2.0.3 ( higher is also OK )

    Resident Memory
    Linux server rebooted spontaneously, after experiencing a lack of memory and having to use SWAP
    Cause: Bug 22226365 : THREADED_EXECUTION=TRUE – SCMN PROCESS RES MEMORY INCREASE
    Solved by implementing the patch.

    Shared Pool
    Memory allocation of PGA seems to be from the Shared Pool. I can’t find any mention of this in Oracle docs, so I’m not stating this as a fact… it may or may not change in future releases, but to be on the safe side, until Oracle documents this, I will presume that in combination with threaded_execution the PGA is allocated from the Shared Pool.
    Action: for some databases I doubled or even quadrupled the SGA.

    Big Table Caching
    This 12c feature enables you to set a certain percentage of the buffer cache apart for the caching of “full_table_scanned” tables. By caching these tables in memory ( in part or full ), you can boost application performance considerably, where this constituted an application performance bottleneck before.
    Action: for some databases I doubled or even quadrupled the SGA.
    This is not directly related to threaded_execution, but it’s nice to know that the action taken to accommodate PGA in the shared pool also affects the size of the buffer cache, making room for the implementation of big_table_caching.

    Datapatch
    Datapatch is done, or should be done by Oracle patching. As mentioned before, threaded_execution doesn’t allow “/ as sysdba” connections, but datapatch cannot do without.
    Solved by setting threaded_execution=false and bouncing the database, setting threaded_execution=true after datapatch and bounce again.
    As of the “160719” patch this seems to be resolved… if datapatch.sql can’t login with “/ as sysdba” this is recognized and the script will ask for sys password.

    ODA Update
    Every 3 months the ODA X5-2 must be updated.
    At the moment we are at the latest version 2.9 ( patch 161018 ), and the part that updates the Oracle Home(s) still can’t cope with threaded_execution. This is solved by setting threaded_execution=false and bouncing the database(s), setting threaded_execution=true after the update and bounce again.

    The glogin.sql script we use interferes with the ODA update.
    Solved by renaming the file before the ODA update, and renaming it back to its original name after the ODA update.

    The Oracle Database 12c Patch for Bug 22226365 : THREADED_EXECUTION=TRUE – SCMN PROCESS RES MEMORY INCREASE does not interfere with the 2.9 ODA update.

    After all of this
    Am I still as enthusiastic as before over this medicine… yes, but please solve the main side effect of not being able to use database login with OS authentication. It is not so much a problem for me ( check the SEPS paragraph ) but more so for Oracle itself, because some software ( like ODA Update ) is not prepared for this configuration (yet).

    The post How About Oracle Database 12c Threaded_Execution appeared first on AMIS Oracle and Java Blog.

    Pages