Amis Blog

Subscribe to Amis Blog feed
Friends of Oracle and Java
Updated: 2 hours 12 min ago

Introducing NoSQL and MongoDB to Relational Database professionals

Wed, 2017-03-15 06:25

Most enterprises have a lot of variety in the data they deal with. Some data is highly structured and other is very unstructured, some data is bound by strict integrity rules and quality constraints and other is free of any restrictions, some data is “hot” – currently very much in demand – and other data can be stone cold. Some data needs to extremely accurate, down to a prescribed number of fractional digits and other is only approximate. Some is highly confidential and other publicly accessible. Some is around in small quantities and other in huge volumes.

Over the years many IT professionals and companies have come to the realization that all this differentiation in data justifies or even mandates a differentiation in how the data is stored and processed. It does not make sense to treat the hottest transactional data in the same way as the archived records from 30 years. Yet many organizations have been doing exactly that: store it all in the enterprise relational database. It works, keeps all data accessible for those rare instances where that really old data is required and most importantly: keeps all data accessible in the same way – through straightforward SQL queries.

On March 14th, we organized a SIG session at AMIS around NoSQL in general and MongoDB in particular. We presented on the history of NoSQL, how it complements relational databases and a pure SQL approach and what types of NoSQL databases are available. Subsequently we focused on MongoDB, introducing the product and its architecture and discussing how to interact with MongoDB from JavaScript/NodeJS and from Java.

The slides presented by the speakers – Pom Bleeksma and Lucas Jellema – are shown here (from SlideShare):

 

The handson workshop is completely available from GitHub: https://github.com/lucasjellema/sig-nosql-mongodb.

image

An additional slide deck was discussed – to demonstrate 30 queries side by side, against MongoDB vs Oracle Database SQL. This slide deck includes the MongoDB operations:

•Filter & Sort (find, sort)

•Aggregation ($group, $project, $match, $sort)

•Lookup & Outer Join ($lookup, $arrayElemAt)

•Facet Search ($facet, $bucket, $sortByCount)

•Update (findAndModify, forEach, save, update, upsert, $set, $unset)

•Date and Time operations

•Materialized View ($out)

•Nested documents/tables ($unwind, $reduce)

•Geospatial (ensureIndex, 2dsphere, $near, $geoNear)

•Text Search (createIndex, text, $text, $search)

•Stored Procedures (db.system.js.save, $where)

 

The post Introducing NoSQL and MongoDB to Relational Database professionals appeared first on AMIS Oracle and Java Blog.

Oracle Public Cloud – Invoking ICS endpoints from SOA CS – configure SSL certificate and basic authentication

Wed, 2017-03-15 06:16

As part of the Soaring through the Clouds demo of 17 Oracle Public Cloud services, I had to integrate SOA CS with both ACCS (Application Container Cloud) and ICS (Integration Cloud Service).

image

Calls from Service Bus and SOA Composites running in SOA Suite 12c on SOA CS to endpoints on ACCS (Node.js Express applications) and ICS (REST connector endpoint) were required in this demo. These calls are over SSL (to https endpoints) and for ICS also require basic authentication (at present, ICS endpoints cannot be invoked anonymously).

This article shows the steps for taking care of these two aspects:

  • ensure that the JVM under SOA Suite on SOA CS knows and trusts the SSL certificate for ACCS or ICS
  • ensure that the call from SOA CS to ICS carries basic authentication details

The starting point is a SOA Composite that corresponds with the preceding figure – with external references to DBaaS (through Database Adapter), ICS (to call an integration that talks to Twitter) and ACCS (to invoke a REST API on NodeJS that calls out to the Spotify API):

image

Configure SSL Certificate on JVM under SOA Suite on SOA CS

I have tried to deploy the SOA composite (successful) and invoke the TweetServiceSOAP endpoint (that invokes ICS) (not successful). The first error I run into is:

env:Serverjavax.ws.rs.ProcessingException: javax.net.ssl.SSLHandshakeException: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested targetoracle.sysman.emInternalSDK.webservices.util.SoapTestException: Client received SOAP Fault from server : javax.ws.rs.ProcessingException: javax.net.ssl.SSLHandshakeException: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target

image

This may sound a little cryptic, but is actually quite simple: the endpoint for the ICS service I am trying to invoke is: https://ics4emeapartner-partnercloud17.integration.us2.oraclecloud.com/integration/flowapi/rest/ACEDEM_RESTME_… The essential is right at the beginning: https. The communication with the endpoint is secure, over SSL. This requires the certificate of the ICS server to be used by SOA CS (in particular the JVM under WebLogic running SOA Suite on the SOA CS instance). For this to happen, the certificate needs to be configured with the JVM as a trusted certificate.

With WebLogic 12c it has become a lot easier to register certificates with the server – going through the Enterprise Manager Fusion Middleware Control. These are the steps:

1. Paste the endpoint for the ICS service in the browser’s location bar and try to access it; this will not result in a meaningful response. It will however initiate an SSL connection between browser and server, as you can tell from the padlock icon displayed to the left of the location bar

image

2. Click on the padlock icon, to open the details for the SSL certificate

SNAGHTML1005017

Open the Security tab and click on View Certificate

SNAGHTML100be83

3. Open the Details tab and Export the Certificate

SNAGHTML101da3d

Save the certificate to a file:

SNAGHTML10236cd

4. Open the Enterprise Manager Fusion Middleware Control for the WebLogic Domain under the SOA CS instance. Navigate to Security | Keystore:

image

5. Select Stripe system | trust and click on the Manage button

image

6. Click on Import to import a new certificate:

image

Select Trusted Certificate as the Certificate Type. Provide an alias to identify the certificate.

Click browse and select the file that was saved when exporting the certificate in step 3:

image

Click OK.

The Certificate is imported and added to the keystore:

image

7. Restart the WebLogic Domain (admin server and all managed servers)

Unfortunately for the new certificate to become truly available, a restart is (still) required. (or at least, that is my understanding, perhaps you can try without because it seems like a very heavy step)

This blog by Adam DesJardin from our REAL partner AVIO Consulting provided much of the answer: http://www.avioconsulting.com/blog/soa-suite-12c-and-opss-keystore-service

 

Add basic authentication to the call from SOA CS to ICS

When I again tested my call to the TweetServiceSOAP endpoint (that invokes ICS), I was again not successful. This time, a different exception occurred:

env:ServerAuthorization Requiredoracle.sysman.emInternalSDK.webservices.util.SoapTestException: Client received SOAP Fault from server : Authorization Required

This is not really a surprise: all calls to ICS endpoints require basic authentication (because at present, ICS endpoints cannot be invoked anonymously). These are the steps to make this successful:

1. Create an Oracle Public Cloud user account with one permission: call ICS services: johndoe

Now we need to a credential for jonhdoe in a credential map in the credential store in WebLogic, and refer to that credential in a OWMS Security Policy that we add to the Reference in the SOA Composite that makes the call to ICS.

2. Open the Enterprise Manager Fusion Middleware Control for the WebLogic Domain under the SOA CS instance. Navigate to Security | Credentials:

image

3. If the map oracle.wsm.security does not yet exist, click on Create Map. Enter the name oracle.wsm.security in the Map Name field and click on OK.

image

4. Select the map oracle.wsm.security and click on Create Key

image

Set the Key for this credential; the key is used to refer to the credential in the security policy. Here I use ICSJohnDoe.

image

Set the type of Password and the username and password to the correct values for the ICS user. Click on OK to create.

image

5. Add a security policy to the Reference in the SOA Composite.

In JDeveloper open the SOA Composite. Right click on the Reference. Select Configure SOA WS Policies from the context menu.

image

Click on the plus icon in the category Security. Select oracle/http_basic_auth_over_ssl_client_policy.

image

Set the value of property csf-key to the Key value defined for the credential in step 4, in my case ICSJohnDoe.

Click on OK.

6. Redeploy the SOA Composite to SOA CS.

 

This time when I invoke the Web Service, my Tweet gets published:

image

The flow trace for the SOA Composite:

image

Resources

A-Team Article – add certificate to JCS and invoke JCS from ICS – http://www.ateam-oracle.com/configuring-https-between-integration-cloud-service-and-java-cloud-service/

    The post Oracle Public Cloud – Invoking ICS endpoints from SOA CS – configure SSL certificate and basic authentication appeared first on AMIS Oracle and Java Blog.

    Change UUIDs in VirtualBox

    Sun, 2017-03-12 00:31

    If you are anything like me you will have multiple virtualboxes running on your system. Sometimes you might want to run a copy of a virtualbox for different purposes. Like running an Oracle 11 Devdays instance as test environment but also running the same vbox for customer testing. If you copy the vbox and try to run it in the manager you’ll be presented with an error that a harddisk with the same UUID already exists. Here’s how I solved it.

    First of all you make a backup-copy of the Virtualbox you want to change. While this is running you can download the portable apps UUID-GUID generator or if you are not running windows a similar program. You can also use an online GUID generator.

    After the backup has completed you can start changing the UUIDs for the VirtualBox. Open the <virtualboxname>.vbox file in a text editor. There are a couple of UUIDs that need to be changed:

    First look for the <Machine> tag (2nd tag in the xml file). One of the attributes is uuid={some_uuid}. You can change this to your new uuid. This is where the generator comes in, just generate a new uuid and paste that here.

    Next you need to change the uuids for the harddisks. This is a little more tricky. Find the tag <Harddisk> and look for the uuid attribute. This uuid is used multiple times in the xml file. Also in the StorageControllers section. The easiest way to keep these in sync is to do a search-and-replace over the entire file. Search for the current uuid, replace with a freshly generated uuid. Before you change the next one. you also need to change the uuid in the harddisk file. You do this running a command line utility VBoxManage.
    The command is like this:
    <path_to_virtualbox>VBoxManage internalcommands sethduuid <filepath> <uuid>

    Repeat this process for all the harddisks that are defined. This way you can have multiple instances of the same VirtualBox in your VirtualBox Manager.

    You may want to change other settings like MAC Addresses for your network cards, but you can do this using the VBox interface.

    The post Change UUIDs in VirtualBox appeared first on AMIS Oracle and Java Blog.

    Oracle Service Bus : Service Exploring via WebLogic Server MBeans with JMX

    Thu, 2017-03-09 03:34

    At a public sector organization in the Netherlands there was the need to make an inventory of the deployed OSB services in order to find out, the dependencies with certain external web services (which were on a list to become deprecated).

    For this, in particular the endpoints of business services were of interest.

    Besides that, the dependencies between services and also the Message Flow per proxy service was of interest, in particular Operational Branch, Route, Java Callout and Service Callout actions.

    Therefor an OSBServiceExplorer tool was developed to explore the services (proxy and business) within the OSB via WebLogic Server MBeans with JMX. For now, this tool was merely used to quickly return the information needed, but in the future it can be the basis for a more comprehensive one.

    This article will explain how the OSBServiceExplorer tool uses WebLogic Server MBeans with JMX.

    If you are interested in general information about, using MBeans with JMX, I kindly point you to another article (written be me) on the AMIS TECHNOLOGY BLOG: “Oracle Service Bus : disable / enable a proxy service via WebLogic Server MBeans with JMX”, via url: https://technology.amis.nl/2017/02/28/oracle-service-bus-disable-enable-a-proxy-service-via-weblogic-server-mbeans-with-jmx/

    Remark: Some names in the examples in this article are in Dutch, but don’t let this scare you off.

    MBeans

    For ease of use, a ms-dos batch file was created, using MBeans, to explore services (proxy and business). The WebLogic Server contains a set of MBeans that can be used to configure, monitor and manage WebLogic Server resources.

    On a server, the ms-dos batch file “OSBServiceExplorer.bat” is called.

    The content of the ms-dos batch file “OSBServiceExplorer.bat” is:
    java.exe -classpath “OSBServiceExplorer.jar;com.bea.common.configfwk_1.7.0.0.jar;sb-kernel-api.jar;sb-kernel-impl.jar;wlfullclient.jar” nl.xyz.osbservice.osbserviceexplorer. OSBServiceExplorer “xyz” “7001” “weblogic” “xyz”

    In the ms-dos batch file via java.exe a class named OSBServiceExplorer is being called. The main method of this class expects the following parameters:

    Parameter name Description HOSTNAME Host name of the AdminServer PORT Port of the AdminServer USERNAME Username PASSWORD Passsword

    In the sample code shown at the end of this article, the use of the following MBeans can be seen:

    Provides a common access point for navigating to all runtime and configuration MBeans in the domain as well as to MBeans that provide domain-wide services (such as controlling and monitoring the life cycles of servers and message-driven EJBs and coordinating the migration of migratable services). [https://docs.oracle.com/middleware/1213/wls/WLAPI/weblogic/management/mbeanservers/domainruntime/DomainRuntimeServiceMBean.html]

    This library is not by default provided in a WebLogic install and must be build. The simple way of how to do this is described in “Fusion Middleware Programming Stand-alone Clients for Oracle WebLogic Server, Using the WebLogic JarBuilder Tool”, which can be reached via url: https://docs.oracle.com/cd/E28280_01/web.1111/e13717/jarbuilder.htm#SACLT240.

    Provides methods for retrieving runtime information about a server instance and for transitioning a server from one state to another. [https://docs.oracle.com/cd/E11035_01/wls100/javadocs_mhome/weblogic/management/runtime/ServerRuntimeMBean.html]

    Provides various API to query, export and import resources, obtain validation errors, get and set environment values, and in general manage resources in an ALSB domain. [https://docs.oracle.com/cd/E13171_01/alsb/docs26/javadoc/com/bea/wli/sb/management/configuration/ALSBConfigurationMBean.html]

    Once the connection to the DomainRuntimeServiceMBean is made, other MBeans can be found via the findService method.

    Service findService(String name,
                        String type,
                        String location)
    

    This method returns the Service on the specified Server or in the primary MBeanServer if the location is not specified.

    In the sample code shown at the end of this article, certain java fields are used. For reading purposes the field values are shown in the following table:

    Field Field value DomainRuntimeServiceMBean.MBEANSERVER_JNDI_NAME weblogic.management.mbeanservers.domainruntime DomainRuntimeServiceMBean.OBJECT_NAME com.bea:Name=DomainRuntimeService,Type=weblogic.management.mbeanservers.domainruntime.DomainRuntimeServiceMBean ALSBConfigurationMBean.NAME ALSBConfiguration ALSBConfigurationMBean.TYPE com.bea.wli.sb.management.configuration.ALSBConfigurationMBean Ref.DOMAIN <Reference to the domain>

    Because of the use of com.bea.wli.config.Ref.class , the following library <Middleware Home Directory>/Oracle_OSB1/modules/com.bea.common.configfwk_1.7.0.0.jar was needed.

    A Ref uniquely represents a resource, project or folder that is managed by the Configuration Framework.

    A special Ref DOMAIN refers to the whole domain.
    [https://docs.oracle.com/cd/E17904_01/apirefs.1111/e15033/com/bea/wli/config/Ref.html]

    Because of the use of weblogic.management.jmx.MBeanServerInvocationHandler.class , the following library <Middleware Home Directory>/wlserver_10.3/server/lib/wlfullclient.jar was needed.

    When running the code the following error was thrown:

    java.lang.RuntimeException: java.lang.ClassNotFoundException: com.bea.wli.sb.management.configuration.DelegatedALSBConfigurationMBean
    	at weblogic.management.jmx.MBeanServerInvocationHandler.newProxyInstance(MBeanServerInvocationHandler.java:621)
    	at weblogic.management.jmx.MBeanServerInvocationHandler.invoke(MBeanServerInvocationHandler.java:418)
    	at $Proxy0.findService(Unknown Source)
    	at nl.xyz.osbservice.osbserviceexplorer.OSBServiceExplorer.<init>(OSBServiceExplorer.java:174)
    	at nl.xyz.osbservice.osbserviceexplorer.OSBServiceExplorer.main(OSBServiceExplorer.java:445)
    Caused by: java.lang.ClassNotFoundException: com.bea.wli.sb.management.configuration.DelegatedALSBConfigurationMBean
    	at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
    	at java.security.AccessController.doPrivileged(Native Method)
    	at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
    	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
    	at weblogic.management.jmx.MBeanServerInvocationHandler.newProxyInstance(MBeanServerInvocationHandler.java:619)
    	... 4 more
    Process exited.
    
    

    So because of the use of com.bea.wli.sb.management.configuration.DelegatedALSBConfigurationMBean.class the following library <Middleware Home Directory>/Oracle_OSB1/lib/sb-kernel-impl.jar was also needed.

    Runtime information (name and state) of the server instances

    The OSBServiceExplorer tool writes its output to a text file called “OSBServiceExplorer.txt”.

    First the runtime information (name and state) of the server instances (Administration Server and Managed Servers) of the WebLogic domain are written to file.

    Example content fragment of the text file:

    Found server runtimes:
    - Server name: AdminServer. Server state: RUNNING
    - Server name: ManagedServer1. Server state: RUNNING
    - Server name: ManagedServer2. Server state: RUNNING

    See the code fragment below:

    fileWriter.write("Found server runtimes:\n");
    int length = (int)serverRuntimes.length;
    for (int i = 0; i < length; i++) {
        ServerRuntimeMBean serverRuntimeMBean = serverRuntimes[i];
    
        String name = serverRuntimeMBean.getName();
        String state = serverRuntimeMBean.getState();
        fileWriter.write("- Server name: " + name + ". Server state: " +
                         state + "\n");
    }
    fileWriter.write("" + "\n");
    List of Ref objects (projects, folders, or resources)

    Next, a list of Ref objects is written to file, including the total number of objects in the list.

    Example content fragment of the text file:

    Found total of 1132 refs, including the following proxy and business services: 
    …
    - ProxyService: JMSConsumerStuFZKNMessageService-1.0/proxy/JMSConsumerStuFZKNMessageService_PS
    …
    - ProxyService: ZKN ZaakService-2.0/proxy/UpdateZaak_Lk01_PS
    …
    - BusinessService: ZKN ZaakService-2.0/business/eBUS/eBUS_FolderService_BS

    See the code fragment below:

    Set refs = alsbConfigurationMBean.getRefs(Ref.DOMAIN);
    
    
    fileWriter.write("Found total of " + refs.size() + " refs, including the following proxy and business services:\n");
    
    for (Ref ref : refs) {
        String typeId = ref.getTypeId();
    
        if (typeId.equalsIgnoreCase("ProxyService")) {
    
            fileWriter.write("- ProxyService: " + ref.getFullName() +
                             "\n");
        } else if (typeId.equalsIgnoreCase("BusinessService")) {
            fileWriter.write("- BusinessService: " + ref.getFullName() +
                             "\n");
        } else {
            //fileWriter.write(ref.getFullName());
        }
    }
    
    fileWriter.write("" + "\n");

    As mentioned before, a Ref object uniquely represents a resource, project or folder. A Ref object has two components:

    • typeId that indicates whether it is a project, folder, or a resource
    • array of names of non-zero length.

    For a resource the array of names start with the project name, followed by folder names, and end with the resource name.
    For a project, the Ref object simply contains one name component, that is, the project name.
    A Ref object for a folder contains the project name followed by the names of the folders which it is nested under.

    [https://docs.oracle.com/cd/E17904_01/apirefs.1111/e15033/com/bea/wli/config/Ref.html]

    Below is an example of a Ref object that represents a folder (via JDeveloper Debug):

    Below is an example of a Ref object that represents a resource (via JDeveloper Debug):

    ResourceConfigurationMBean

    In order to be able to determine the actual endpoints of the proxy services and business services, the ResourceConfigurationMBean is used. When connected, the Service Bus MBeans are located under com.oracle.osb. [https://technology.amis.nl/2014/10/20/oracle-service-bus-obtaining-list-exposed-soap-http-endpoints/]

    When we look at the java code, as a next step, the names of a set of MBeans specified by pattern matching are put in a list and looped through.

    Once the connection to the DomainRuntimeServiceMBean is made, other MBeans can be found via the queryNames method.

    Set queryNames(ObjectName name,
                   QueryExp query)
                   throws IOException
    

    Gets the names of MBeans controlled by the MBean server. This method enables any of the following to be obtained: The names of all MBeans, the names of a set of MBeans specified by pattern matching on the ObjectName and/or a Query expression, a specific MBean name (equivalent to testing whether an MBean is registered). When the object name is null or no domain and key properties are specified, all objects are selected (and filtered if a query is specified). It returns the set of ObjectNames for the MBeans selected.
    [https://docs.oracle.com/javase/7/docs/api/javax/management/MBeanServerConnection.html]

    See the code fragment below:

    String domain = "com.oracle.osb";
    String objectNamePattern =
        domain + ":" + "Type=ResourceConfigurationMBean,*";
    
    Set osbResourceConfigurations =
        connection.queryNames(new ObjectName(objectNamePattern), null);
    
    fileWriter.write("ResourceConfiguration list of proxy and business services:\n");
    for (ObjectName osbResourceConfiguration :
         osbResourceConfigurations) {
    …
        String canonicalName =
            osbResourceConfiguration.getCanonicalName();
        fileWriter.write("- Resource: " + canonicalName + "\n");
    …
    }

    The pattern used is: com.oracle.osb:Type=ResourceConfigurationMBean,*

    Example content fragment of the text file:

    ResourceConfiguration list of proxy and business services:
    …
    - Resource: com.oracle.osb:Location=AdminServer,Name=ProxyService$ZKN ZaakService-2.0$proxy$UpdateZaak_Lk01_PS,Type=ResourceConfigurationMBean
    …

    Below is an example of an ObjectName object (via JDeveloper Debug), found via the queryNames method:

    Via the Oracle Enterprise Manager Fusion Middleware Control for a certain domain, the System MBean Browser can be opened. Here the previously mentioned ResourceConfigurationMBean’s can be found.


    [Via MBean Browser]

    The information on the right is as follows (if we navigate to a particular ResourceConfigurationMBean, for example …$UpdateZaak_Lk01_PS) :


    [Via MBean Browser]

    Here we can see that the attributes Configuration and Metadata are available:

    • Configuration

    [Via MBean Browser]

    The Configuration is made available in java by the following code fragment:

    CompositeDataSupport configuration = (CompositeDataSupport)connection.getAttribute(osbResourceConfiguration,"Configuration");
    • Metadata

    [Via MBean Browser]

    The Metadata is made available in java by the following code fragment:

    CompositeDataSupport metadata = (CompositeDataSupport)connection.getAttribute(osbResourceConfiguration,"Metadata");
    Diving into attribute Configuration of the ResourceConfigurationMBean

    For each found proxy and business service the configuration information (canonicalName, service-type, transport-type, url) is written to file.

    See the code fragment below:

    String canonicalName =
        osbResourceConfiguration.getCanonicalName();
    …
    String servicetype =
        (String)configuration.get("service-type");
    CompositeDataSupport transportconfiguration =
        (CompositeDataSupport)configuration.get("transport-configuration");
    String transporttype =
        (String)transportconfiguration.get("transport-type");
    …
    fileWriter.write("  Configuration of " + canonicalName +
                     ":" + " service-type=" + servicetype +
                     ", transport-type=" + transporttype +
                     ", url=" + url + "\n");

    Proxy service configuration:

    Below is an example of a proxy service configuration (content fragment of the text file):

      Configuration of com.oracle.osb:Location=AdminServer,Name=ProxyService$ZKN ZaakService-2.0$proxy$UpdateZaak_Lk01_PS,Type=ResourceConfigurationMBean: service-type=Abstract SOAP, transport-type=local, url=local

    The proxy services which define the exposed endpoints, can be recognized by the ProxyService$ prefix.


    [Via MBean Browser]

    For getting the endpoint, see the code fragment below:

    String url = (String)transportconfiguration.get("url");

    Business service configuration:

    Below is an example of a business service configuration (content fragment of the text file):

      Configuration of com.oracle.osb:Location=AdminServer,Name=BusinessService$ZKN ZaakService-2.0$business$eBUS$eBUS_FolderService_BS,Type=ResourceConfigurationMBean: service-type=SOAP, transport-type=http, url=http://xyz/eBus/FolderService.svc

    The business services which define the exposed endpoints, can be recognized by the BusinessService$ prefix.


    [Via MBean Browser]

    For getting the endpoint, see the code fragment below:

    CompositeData[] urlconfiguration =
        (CompositeData[])transportconfiguration.get("url-configuration");
    String url = (String)urlconfiguration[0].get("url");

    So, via the url key found in the business service configuration, the endpoint of a business service can be found (for example: http://xyz/eBus/FolderService.svc). So in that way the dependencies (proxy and/or business services) with certain external web services (having a certain endpoint), could be found.

    Proxy service pipeline, element hierarchy

    For a proxy service the elements (nodes) of the pipeline are investigated.

    See the code fragment below:

    CompositeDataSupport pipeline =
        (CompositeDataSupport)configuration.get("pipeline");
    TabularDataSupport nodes =
        (TabularDataSupport)pipeline.get("nodes");


    [Via MBean Browser]

    Below is an example of a nodes object (via JDeveloper Debug):

    If we take a look at the dataMap object, we can see nodes of different types.

    Below is an example of a node of type Stage (via JDeveloper Debug):

    Below is an example of a node of type Action and label ifThenElse (via JDeveloper Debug):

    Below is an example of a node of type Action and label wsCallout (via JDeveloper Debug):

    For the examples above the Message Flow part of the UpdateZaak_Lk01_PS proxy service looks like:

    The mapping between the node-id and the corresponding element in the Messsage Flow can be achieved by looking in the .proxy file (in this case: UpdateZaak_Lk01_PS.proxy) for the _ActiondId- identification, mentioned as value for the name key.

    <con:stage name="EditFolderZaakStage">
            <con:context>
              …
            </con:context>
            <con:actions>
              <con3:ifThenElse>
                <con2:id>_ActionId-7997641858449402984--36d1ada1.1562c8caabd.-7c84</con2:id>
                <con3:case>
                  <con3:condition>
                    …
                  </con3:condition>
                  <con3:actions>
                    <con3:wsCallout>
                      <con2:id>_ActionId-7997641858449402984--36d1ada1.1562c8caabd.-7b7f</con2:id>
                      …

    The first node in the dataMap object (via JDeveloper Debug) looks like:

    The dataMap object is of type HashMap. A hashMap maintains key and value pairs and often denoted as HashMap<Key, Value> or HashMap<K, V>. HashMap implements Map interface

    As can be seen, the key is of type Object and the value of type CompositeData.

    In order to know what kind of information is delivered via the CompositeData object, the rowType object can be used.

    See the code fragment below:

    TabularType tabularType = nodes.getTabularType();
    CompositeType rowType = tabularType.getRowType();

    Below is an example of a rowType object (via JDeveloper Debug):

    From this it is now clear that the CompositeData object for a ProxyServicePipelineElementType contains:

    Index key value 0 children Children of this node 1 label Label 2 name Name of the node 3 node-id Id of this node unique within the graph 4 type Pipeline element type

    In the code fragment below, an iterator is used to loop through the dataMap object.

    Iterator keyIter = nodes.keySet().iterator();
    
    for (int j = 0; keyIter.hasNext(); ++j) {
    
        Object[] key = ((Collection)keyIter.next()).toArray();
    
        CompositeData compositeData = nodes.get(key);
    
        …
    }

    The key object for the first node in the dataMap object (via JDeveloper Debug) looks like:

    The value of this key object is 25, which also is shown as the value for the node-id of the compositeData object, which for the first node in the dataMap object (via JDeveloper Debug) looks like:

    It’s obvious that the nodes in the pipeline form a hierarchy. A node can have children, which in turn can also have children, etc. Think for example of a “Stage” having an “If Then” action which in turn contains several “Assign” actions. A proxy service Message Flow can of course contain all kinds of elements (see the Design Palette).

    Below is (for another proxy service) an example content fragment of the text file, that reflects the hierarchy:

         Index#76:
           level    = 1
           label    = branch-node
           name     = CheckOperationOperationalBranch
           node-id  = 62
           type     = OperationalBranchNode
           children = [42,46,50,61]
             level    = 2
             node-id  = 42
             children = [41]
               level    = 3
               label    = route-node
               name     = creeerZaak_Lk01RouteNode
               node-id  = 41
               type     = RouteNode
               children = [40]
                 level    = 4
                 node-id  = 40
                 children = [39]
                   level    = 5
                   label    = route
                   name     = _ActionId-4977625172784205635-3567e5a2.15364c39a7e.-7b99
                   node-id  = 39
                   type     = Action
                   children = []
             level    = 2
             node-id  = 46
             children = [45]
               level    = 3
               label    = route-node
               name     = updateZaak_Lk01RouteNode
               node-id  = 45
               type     = RouteNode
               children = [44]
                 level    = 4
                 node-id  = 44
                 children = [43]
                   level    = 5
                   label    = route
                   name     = _ActionId-4977625172784205635-3567e5a2.15364c39a7e.-7b77
                   node-id  = 43
                   type     = Action
                   children = []
             …
    

    Because of the interest in only certain kind of nodes (Route, Java Callout, Service Callout, etc.) some kind of filtering is needed. For this the label and type keys are used.

    See the code fragment below:

    String label = (String)compositeData.get("label");
    String type = (String)compositeData.get("type");
    
    if (type.equals("Action") &&
        (label.contains("wsCallout") ||
         label.contains("javaCallout") ||
         label.contains("route"))) {
    
        fileWriter.write("    Index#" + j + ":\n");
        printCompositeData(nodes, key, 1);
    } else if (type.equals("OperationalBranchNode") ||
               type.equals("RouteNode"))
    {
        fileWriter.write("    Index#" + j + ":\n");
        printCompositeData(nodes, key, 1);
    }

    Example content fragment of the text file:

        Index#72:
           level    = 1
           label    = wsCallout
           name     = _ActionId-7997641858449402984--36d1ada1.1562c8caabd.-7b7f
           node-id  = 71
           type     = Action
           children = [66,70]
        Index#98:
           level    = 1
           label    = wsCallout
           name     = _ActionId-7997641858449402984--36d1ada1.1562c8caabd.-7997
           node-id  = 54
           type     = Action
           children = [48,53]
        Index#106:
           level    = 1
           label    = wsCallout
           name     = _ActionId-7997641858449402984--36d1ada1.1562c8caabd.-7cf4
           node-id  = 35
           type     = Action
           children = [30,34]
    

    When we take a closer look at the node of type Action and label wsCallout with index 106, this can also be found in the MBean Browser:


    [Via MBean Browser]

    The children node-id’s are 30 (a node of type Sequence and name requestTransform, also having children) and 34 (a node of type Sequence and name responseTransform, also having children).

    Diving into attribute Metadata of the ResourceConfigurationMBean

    For each found proxy service the metadata information (dependencies and dependents) is written to file.

    See the code fragment below:

    fileWriter.write("  Metadata of " + canonicalName + "\n");
    
    String[] dependencies =
        (String[])metadata.get("dependencies");
    fileWriter.write("    dependencies:\n");
    int size;
    size = dependencies.length;
    for (int i = 0; i < size; i++) {
        String dependency = dependencies[i];
        if (!dependency.contains("Xquery")) {
            fileWriter.write("      - " + dependency + "\n");
        }
    }
    fileWriter.write("" + "\n");
    
    String[] dependents = (String[])metadata.get("dependents");
    fileWriter.write("    dependents:\n");
    size = dependents.length;
    for (int i = 0; i < size; i++) {
        String dependent = dependents[i];
        fileWriter.write("      - " + dependent + "\n");
    }
    fileWriter.write("" + "\n");

    Example content fragment of the text file:

      Metadata of com.oracle.osb:Location=AdminServer,Name=ProxyService$ZKN ZaakService-2.0$proxy$UpdateZaak_Lk01_PS,Type=ResourceConfigurationMBean
        dependencies:
          - BusinessService$ZKN ZaakService-2.0$business$eBUS$eBUS_FolderService_BS
          - XMLSchema$CDM$Interface$StUF-ZKN_1_1_02$zkn0310$mutatie$zkn0310_msg_mutatie
          - BusinessService$ZKN ZaakService-2.0$business$eBUS$eBUS_SearchService_BS
          - BusinessService$ZKN ZaakService-2.0$business$eBUS$eBUS_LookupService_BS
    
        dependents:
          - ProxyService$JMSConsumerStuFZKNMessageService-1.0$proxy$JMSConsumerStuFZKNMessageService_PS
          - ProxyService$ZKN ZaakService-2.0$proxy$ZaakService_PS
    

    As can be seen in the MBean Browser, the metadata for a particular proxy service shows the dependencies on other resources (like business services and XML Schemas) and other services that are dependent on the proxy service.


    [Via MBean Browser]

    By looking at the results in the text file "OSBServiceExplorer.txt", the dependencies between services (proxy and business) and also the dependencies with certain external web services (with a particular endpoint) could be extracted.

    Example content of the text file:

    Found server runtimes:
    - Server name: AdminServer. Server state: RUNNING
    - Server name: ManagedServer1. Server state: RUNNING
    - Server name: ManagedServer2. Server state: RUNNING
    
    Found total of 1132 refs, including the following proxy and business services: 
    …
    - ProxyService: JMSConsumerStuFZKNMessageService-1.0/proxy/JMSConsumerStuFZKNMessageService_PS
    …
    - ProxyService: ZKN ZaakService-2.0/proxy/UpdateZaak_Lk01_PS
    …
    - BusinessService: ZKN ZaakService-2.0/business/eBUS/eBUS_FolderService_BS
    …
    
    ResourceConfiguration list of proxy and business services:
    …
    - Resource: com.oracle.osb:Location=AdminServer,Name=ProxyService$ZKN ZaakService-2.0$proxy$UpdateZaak_Lk01_PS,Type=ResourceConfigurationMBean
      Configuration of com.oracle.osb:Location=AdminServer,Name=ProxyService$ZKN ZaakService-2.0$proxy$UpdateZaak_Lk01_PS,Type=ResourceConfigurationMBean: service-type=Abstract SOAP, transport-type=local, url=local
    
        Index#72:
           level    = 1
           label    = wsCallout
           name     = _ActionId-7997641858449402984--36d1ada1.1562c8caabd.-7b7f
           node-id  = 71
           type     = Action
           children = [66,70]
        Index#98:
           level    = 1
           label    = wsCallout
           name     = _ActionId-7997641858449402984--36d1ada1.1562c8caabd.-7997
           node-id  = 54
           type     = Action
           children = [48,53]
        Index#106:
           level    = 1
           label    = wsCallout
           name     = _ActionId-7997641858449402984--36d1ada1.1562c8caabd.-7cf4
           node-id  = 35
           type     = Action
           children = [30,34]
    
      Metadata of com.oracle.osb:Location=AdminServer,Name=ProxyService$ZKN ZaakService-2.0$proxy$UpdateZaak_Lk01_PS,Type=ResourceConfigurationMBean
        dependencies:
          - BusinessService$ZKN ZaakService-2.0$business$eBUS$eBUS_FolderService_BS
          - XMLSchema$CDM$Interface$StUF-ZKN_1_1_02$zkn0310$mutatie$zkn0310_msg_mutatie
          - BusinessService$ZKN ZaakService-2.0$business$eBUS$eBUS_SearchService_BS
          - BusinessService$ZKN ZaakService-2.0$business$eBUS$eBUS_LookupService_BS
    
        dependents:
          - ProxyService$JMSConsumerStuFZKNMessageService-1.0$proxy$JMSConsumerStuFZKNMessageService_PS
          - ProxyService$ZKN ZaakService-2.0$proxy$ZaakService_PS
    …

    The java code:

    package nl.xyz.osbservice.osbserviceexplorer;
    
    
    import com.bea.wli.config.Ref;
    import com.bea.wli.sb.management.configuration.ALSBConfigurationMBean;
    
    import java.io.FileWriter;
    import java.io.IOException;
    
    import java.net.MalformedURLException;
    
    import java.util.Collection;
    import java.util.HashMap;
    import java.util.Hashtable;
    import java.util.Iterator;
    import java.util.Properties;
    import java.util.Set;
    
    import javax.management.MBeanServerConnection;
    import javax.management.MalformedObjectNameException;
    import javax.management.ObjectName;
    import javax.management.openmbean.CompositeData;
    import javax.management.openmbean.CompositeDataSupport;
    import javax.management.openmbean.CompositeType;
    import javax.management.openmbean.TabularDataSupport;
    import javax.management.openmbean.TabularType;
    import javax.management.remote.JMXConnector;
    import javax.management.remote.JMXConnectorFactory;
    import javax.management.remote.JMXServiceURL;
    
    import javax.naming.Context;
    
    import weblogic.management.jmx.MBeanServerInvocationHandler;
    import weblogic.management.mbeanservers.domainruntime.DomainRuntimeServiceMBean;
    import weblogic.management.runtime.ServerRuntimeMBean;
    
    
    public class OSBServiceExplorer {
        private static MBeanServerConnection connection;
        private static JMXConnector connector;
        private static FileWriter fileWriter;
    
        /**
         * Indent a string
         * @param indent - The number of indentations to add before a string 
         * @return String - The indented string
         */
        private static String getIndentString(int indent) {
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < indent; i++) {
                sb.append("  ");
            }
            return sb.toString();
        }
    
    
        /**
         * Print composite data (write to file)
         * @param nodes - The list of nodes
         * @param key - The list of keys
         * @param level - The level in the hierarchy of nodes
         */
        private void printCompositeData(TabularDataSupport nodes, Object[] key,
                                        int level) {
            try {
                CompositeData compositeData = nodes.get(key);
    
                fileWriter.write(getIndentString(level) + "     level    = " +
                                 level + "\n");
    
                String label = (String)compositeData.get("label");
                String name = (String)compositeData.get("name");
                String nodeid = (String)compositeData.get("node-id");
                String type = (String)compositeData.get("type");
                String[] childeren = (String[])compositeData.get("children");
                if (level == 1 ||
                    (label.contains("route-node") || label.contains("route"))) {
                    fileWriter.write(getIndentString(level) + "     label    = " +
                                     label + "\n");
    
                    fileWriter.write(getIndentString(level) + "     name     = " +
                                     name + "\n");
    
                    fileWriter.write(getIndentString(level) + "     node-id  = " +
                                     nodeid + "\n");
    
                    fileWriter.write(getIndentString(level) + "     type     = " +
                                     type + "\n");
    
                    fileWriter.write(getIndentString(level) + "     children = [");
    
                    int size = childeren.length;
    
                    for (int i = 0; i < size; i++) {
                        fileWriter.write(childeren[i]);
                        if (i < size - 1) { fileWriter.write(","); } } fileWriter.write("]\n"); } else if (level >= 2) {
                    fileWriter.write(getIndentString(level) + "     node-id  = " +
                                     nodeid + "\n");
    
                    fileWriter.write(getIndentString(level) + "     children = [");
    
                    int size = childeren.length;
    
                    for (int i = 0; i < size; i++) {
                        fileWriter.write(childeren[i]);
                        if (i < size - 1) { fileWriter.write(","); } } fileWriter.write("]\n"); } if ((level == 1 && type.equals("OperationalBranchNode")) || level > 1) {
                    level++;
    
                    int size = childeren.length;
    
                    for (int i = 0; i < size; i++) {
                        key[0] = childeren[i];
                        printCompositeData(nodes, key, level);
                    }
                }
    
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    
        public OSBServiceExplorer(HashMap props) {
            super();
    
    
            try {
    
                Properties properties = new Properties();
                properties.putAll(props);
    
                initConnection(properties.getProperty("HOSTNAME"),
                               properties.getProperty("PORT"),
                               properties.getProperty("USERNAME"),
                               properties.getProperty("PASSWORD"));
    
    
                DomainRuntimeServiceMBean domainRuntimeServiceMBean =
                    (DomainRuntimeServiceMBean)findDomainRuntimeServiceMBean(connection);
    
                ServerRuntimeMBean[] serverRuntimes =
                    domainRuntimeServiceMBean.getServerRuntimes();
    
                fileWriter = new FileWriter("OSBServiceExplorer.txt", false);
    
    
                fileWriter.write("Found server runtimes:\n");
                int length = (int)serverRuntimes.length;
                for (int i = 0; i < length; i++) {
                    ServerRuntimeMBean serverRuntimeMBean = serverRuntimes[i];
    
                    String name = serverRuntimeMBean.getName();
                    String state = serverRuntimeMBean.getState();
                    fileWriter.write("- Server name: " + name +
                                     ". Server state: " + state + "\n");
                }
                fileWriter.write("" + "\n");
    
                // Create an mbean instance to perform configuration operations in the created session.
                //
                // There is a separate instance of ALSBConfigurationMBean for each session.
                // There is also one more ALSBConfigurationMBean instance which works on the core data, i.e., the data which ALSB runtime uses.
                // An ALSBConfigurationMBean instance is created whenever a new session is created via the SessionManagementMBean.createSession(String) API.
                // This mbean instance is then used to perform configuration operations in that session.
                // The mbean instance is destroyed when the corresponding session is activated or discarded.
                ALSBConfigurationMBean alsbConfigurationMBean =
                    (ALSBConfigurationMBean)domainRuntimeServiceMBean.findService(ALSBConfigurationMBean.NAME,
                                                                                  ALSBConfigurationMBean.TYPE,
                                                                                  null);
    
                Set<Ref> refs = alsbConfigurationMBean.getRefs(Ref.DOMAIN);
    
    
                fileWriter.write("Found total of " + refs.size() +
                                 " refs, including the following proxy and business services:\n");
    
                for (Ref ref : refs) {
                    String typeId = ref.getTypeId();
    
                    if (typeId.equalsIgnoreCase("ProxyService")) {
    
                        fileWriter.write("- ProxyService: " + ref.getFullName() +
                                         "\n");
                    } else if (typeId.equalsIgnoreCase("BusinessService")) {
                        fileWriter.write("- BusinessService: " +
                                         ref.getFullName() + "\n");
                    } else {
                        //fileWriter.write(ref.getFullName());
                    }
                }
    
                fileWriter.write("" + "\n");
    
                String domain = "com.oracle.osb";
                String objectNamePattern =
                    domain + ":" + "Type=ResourceConfigurationMBean,*";
    
                Set<ObjectName> osbResourceConfigurations =
                    connection.queryNames(new ObjectName(objectNamePattern), null);
    
                fileWriter.write("ResourceConfiguration list of proxy and business services:\n");
                for (ObjectName osbResourceConfiguration :
                     osbResourceConfigurations) {
    
                    CompositeDataSupport configuration =
                        (CompositeDataSupport)connection.getAttribute(osbResourceConfiguration,
                                                                      "Configuration");
    
                    CompositeDataSupport metadata =
                        (CompositeDataSupport)connection.getAttribute(osbResourceConfiguration,
                                                                      "Metadata");
    
                    String canonicalName =
                        osbResourceConfiguration.getCanonicalName();
                    fileWriter.write("- Resource: " + canonicalName + "\n");
                    if (canonicalName.contains("ProxyService")) {
                        String servicetype =
                            (String)configuration.get("service-type");
                        CompositeDataSupport transportconfiguration =
                            (CompositeDataSupport)configuration.get("transport-configuration");
                        String transporttype =
                            (String)transportconfiguration.get("transport-type");
                        String url = (String)transportconfiguration.get("url");
                        
                        fileWriter.write("  Configuration of " + canonicalName +
                                         ":" + " service-type=" + servicetype +
                                         ", transport-type=" + transporttype +
                                         ", url=" + url + "\n");
                    } else if (canonicalName.contains("BusinessService")) {
                        String servicetype =
                            (String)configuration.get("service-type");
                        CompositeDataSupport transportconfiguration =
                            (CompositeDataSupport)configuration.get("transport-configuration");
                        String transporttype =
                            (String)transportconfiguration.get("transport-type");
                        CompositeData[] urlconfiguration =
                            (CompositeData[])transportconfiguration.get("url-configuration");
                        String url = (String)urlconfiguration[0].get("url");
    
                        fileWriter.write("  Configuration of " + canonicalName +
                                         ":" + " service-type=" + servicetype +
                                         ", transport-type=" + transporttype +
                                         ", url=" + url + "\n");
                    }
    
                    if (canonicalName.contains("ProxyService")) {
    
                        fileWriter.write("" + "\n");
    
                        CompositeDataSupport pipeline =
                            (CompositeDataSupport)configuration.get("pipeline");
                        TabularDataSupport nodes =
                            (TabularDataSupport)pipeline.get("nodes");
    
                        TabularType tabularType = nodes.getTabularType();
                        CompositeType rowType = tabularType.getRowType();
    
                        Iterator keyIter = nodes.keySet().iterator();
    
                        for (int j = 0; keyIter.hasNext(); ++j) {
    
                            Object[] key = ((Collection)keyIter.next()).toArray();
    
                            CompositeData compositeData = nodes.get(key);
    
                            String label = (String)compositeData.get("label");
                            String type = (String)compositeData.get("type");
                            if (type.equals("Action") &&
                                (label.contains("wsCallout") ||
                                 label.contains("javaCallout") ||
                                 label.contains("route"))) {
    
                                fileWriter.write("    Index#" + j + ":\n");
                                printCompositeData(nodes, key, 1);
                            } else if (type.equals("OperationalBranchNode") ||
                                       type.equals("RouteNode")) {
    
                                fileWriter.write("    Index#" + j + ":\n");
                                printCompositeData(nodes, key, 1);
                            }
                        }
    
                        fileWriter.write("" + "\n");
                        fileWriter.write("  Metadata of " + canonicalName + "\n");
    
                        String[] dependencies =
                            (String[])metadata.get("dependencies");
                        fileWriter.write("    dependencies:\n");
                        int size;
                        size = dependencies.length;
                        for (int i = 0; i < size; i++) {
                            String dependency = dependencies[i];
                            if (!dependency.contains("Xquery")) {
                                fileWriter.write("      - " + dependency + "\n");
                            }
                        }
                        fileWriter.write("" + "\n");
    
                        String[] dependents = (String[])metadata.get("dependents");
                        fileWriter.write("    dependents:\n");
                        size = dependents.length;
                        for (int i = 0; i < size; i++) {
                            String dependent = dependents[i];
                            fileWriter.write("      - " + dependent + "\n");
                        }
                        fileWriter.write("" + "\n");
    
                    }
    
                }
                fileWriter.close();
    
                System.out.println("Succesfully completed");
    
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                if (connector != null)
                    try {
                        connector.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
            }
        }
    
    
        /*
           * Initialize connection to the Domain Runtime MBean Server.
           */
    
        public static void initConnection(String hostname, String portString,
                                          String username,
                                          String password) throws IOException,
                                                                  MalformedURLException {
    
            String protocol = "t3";
            Integer portInteger = Integer.valueOf(portString);
            int port = portInteger.intValue();
            String jndiroot = "/jndi/";
            String mbeanserver = DomainRuntimeServiceMBean.MBEANSERVER_JNDI_NAME;
    
            JMXServiceURL serviceURL =
                new JMXServiceURL(protocol, hostname, port, jndiroot +
                                  mbeanserver);
    
            Hashtable hashtable = new Hashtable();
            hashtable.put(Context.SECURITY_PRINCIPAL, username);
            hashtable.put(Context.SECURITY_CREDENTIALS, password);
            hashtable.put(JMXConnectorFactory.PROTOCOL_PROVIDER_PACKAGES,
                          "weblogic.management.remote");
            hashtable.put("jmx.remote.x.request.waiting.timeout", new Long(10000));
    
            connector = JMXConnectorFactory.connect(serviceURL, hashtable);
            connection = connector.getMBeanServerConnection();
        }
    
    
        private static Ref constructRef(String refType, String serviceURI) {
            Ref ref = null;
            String[] uriData = serviceURI.split("/");
            ref = new Ref(refType, uriData);
            return ref;
        }
    
    
        /**
         * Finds the specified MBean object
         *
         * @param connection - A connection to the MBeanServer.
         * @return Object - The MBean or null if the MBean was not found.
         */
        public Object findDomainRuntimeServiceMBean(MBeanServerConnection connection) {
            try {
                ObjectName objectName =
                    new ObjectName(DomainRuntimeServiceMBean.OBJECT_NAME);
                return (DomainRuntimeServiceMBean)MBeanServerInvocationHandler.newProxyInstance(connection,
                                                                                                objectName);
            } catch (MalformedObjectNameException e) {
                e.printStackTrace();
                return null;
            }
        }
    
    
        public static void main(String[] args) {
            try {
                if (args.length <= 0) {
                    System.out.println("Provide values for the following parameters: HOSTNAME, PORT, USERNAME, PASSWORD.");
    
                } else {
                    HashMap<String, String> map = new HashMap<String, String>();
    
                    map.put("HOSTNAME", args[0]);
                    map.put("PORT", args[1]);
                    map.put("USERNAME", args[2]);
                    map.put("PASSWORD", args[3]);
                    OSBServiceExplorer osbServiceExplorer =
                        new OSBServiceExplorer(map);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    }
    
    

    The post Oracle Service Bus : Service Exploring via WebLogic Server MBeans with JMX appeared first on AMIS Oracle and Java Blog.

    Getting ADF Data in a Jet Component (2)

    Wed, 2017-03-08 06:47

    In my previous blog I explained how to get ADF Data in a Jet Component, this was done by iterating through a ViewObject and rendering a component per element in the View Object. When you want to use the DVT’s from Oracle Jet, this won’t do the thing, because you will need the entire data set to be present at once in your component. This blog will show you how to do that without using Rest Services.

    My colleague Lucas Jellema made a JSONProviderBean, which makes data from data bindings available as nested JSON object in client side JavaScript.1

    Using this bean we can use the iterator binding of our View Object in our component page fragment.

    
    
    <div>
     <amis-chart chart-data="#{jsonProviderBean[bindings.EmployeesVO.DCIteratorBinding]}"/>
     </div>
    
    
    

    This will pass the JSON as a string to our component.

        {
            "properties": {
                "chartData": {
                    "type": "string"
                }
            }
             
        }
    

    In our component viewmodel we can now parse this string into a json object. The “values” object of this json object contains the data we need for our barchart, but it is not in a form the barchart can understand. Therefore you need to write function to get the data you need and put it into a format that the barchart does understand.

        function AmisChartComponentModel(context) {
            var self = this;
        
            context.props.then(function (propertymap) {
                self.properties = propertymap;
                var dataAsJson = JSON.parse(propertymap.chartData);
                var barData = self.createBarSeries(dataAsJson.values);
                /* set chart data */
                self.barSeriesValue = ko.observableArray(barData);
            })
            
            //function to transform the data.
            self.createBarSeries = function (jsonDataArray) {
                var data = [];
                jsonDataArray.forEach(function (item, index, arr) {
                    data.push( {
                        "name" : item.FirstName, "items" : [item.Salary]
                    });
                })
                return data;
            }    
            
        }
        return AmisChartComponentModel;
    
    });
    

    We now have our entire employee data set available for the barchart. In this case I made a chart for Salary per employee, we can do all the fancy interactions with the component that we can normally do as well, for example stacking the data or changing from a horizontal to a vertical graph.

       

    Sources
    1. https://github.com/lucasjellema/adf-binding-to-json
    2. https://technology.amis.nl/2017/03/07/getting-adf-data-jet-component/
    3. http://andrejusb.blogspot.nl/2015/12/improved-jet-rendering-in-adf.html
    4. https://blogs.oracle.com/groundside/entry/jet_composite_components_i_backgrounder (and the other blogs)
    5. Source of this demo: Github
    Versions used

    JDeveloper 12.1.3,
    OracleJet V2.2.0

    Disclaimer

    The information is based on my personal research. At the moment, Oracle does not support or encourage integrating ADF and Jet. Oracle is working on JET Composite Components in ADF.

    The post Getting ADF Data in a Jet Component (2) appeared first on AMIS Oracle and Java Blog.

    Getting ADF Data in a Jet Component (1)

    Tue, 2017-03-07 09:33

    Oracle JET has been around for a while, and at this moment we are investigating what it would take to integrate JET with our existing ADF Application. In the current ADF application we want to make a dashboard in JET, however we still need to know for what project we need the data from. Therefore I am researching on how to get data from our ADF application into our JET part. In this blog I will show you how you can in an easy and quick way get your ADF BC data into your JET Components without using REST services.

    I used the blog of Andrejus1 to set up JET within my ADF Application.

    Add the JET libraries to the public_html folder of the ViewController project.

    (Final) Structure of the project:

    Make a jsf page and use af:resources to get to the css and requireJS and add the main.js

    <?xml version='1.0' encoding='UTF-8'?>
    <!DOCTYPE html>
    <f:view xmlns:f="http://java.sun.com/jsf/core" xmlns:af="http://xmlns.oracle.com/adf/faces/rich" xmlns:dvt="http://xmlns.oracle.com/dss/adf/faces" xmlns:ui="http://java.sun.com/jsf/facelets">
        <af:document title="main.jsf" id="d1">
            <af:messages id="m1"/>
            <af:resource type="css" source="jet/css/alta/2.2.0/web/alta.min.css"/>
            <af:resource type="javascript" source="jet/js/libs/require/require.js"/>
            <img src="" data-wp-preserve="%3Cscript%3E%0A%20%20%20%20%20%20%20%20%20%20require.config(%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20baseUrl%20%3A%20%22jet%2Fjs%22%0A%20%20%20%20%20%20%20%20%20%20%7D)%3B%0A%0A%20%20%20%20%20%20%20%20%20%20require(%5B%22main%22%5D)%3B%0A%20%20%20%20%20%20%20%20%3C%2Fscript%3E" data-mce-resize="false" data-mce-placeholder="1" class="mce-object" width="20" height="20" alt="&lt;script&gt;" title="&lt;script&gt;" />
            <af:form id="f1">
            
            </af:form>
        </af:document>
    </f:view>
    
    

    Then I added my composite component folder to the js folder of JET. My component is named amis-person and will show the name of the person in capital letters and the emailadress within a blue box. You can read more about composite components in the blog series of Duncan2

    Put the metadata directly in the loader.js instead of via a json file, otherwise it will not work!. When you do it via the .json file and you console.log the in the function, you will see it does not print out the metadata from the .json file.

    
    define(['ojs/ojcore',
           './amis-person',
            'text!./amis-person.html',
            'css!./amis-person',
            'ojs/ojcomposite'],
      function (oj, ComponentModel, view, css) {
            'use strict';
             var metadata = '{ "properties": { "amisPersonName": { "type": "string"}, "amisPersonEmail": { "type": "string"}} }';
           oj.Composite.register('amis-person',
          {
    
          metadata: { inline: JSON.parse(metadata) },
          viewModel: { inline: ComponentModel },
          view: { inline: view },
          css: { inline: css }
           });
       });
    

    Import the component in main.js to make it available.

    require(['ojs/ojcore', 'knockout', 'jquery', 'ojs/ojknockout', 'jet-composites/amis-person/loader'],
    function (oj, ko, $){
    function ViewModel() {
        var self = this;</code>
    }
        ko.applyBindings(new ViewModel(), document.body);
    })
    
    

    Create a page fragment where you will put the html to show your component, in this case it is just the composite component.

    <?xml version='1.0' encoding='UTF-8'?>
      <ui:composition xmlns:ui="http://java.sun.com/jsf/facelets">
        <amis-person amis-person-name="NAME" amis-person-email="EMAIL" />
      </ui:composition>
    

    In the jsf page, create an iterator for the viewmodel and put the page fragment within the iterator

     <af:iterator id="iterator" value="#{bindings.EmployeesVO.collectionModel}" var="item">
        <ui:include src="/fragments/amis-person-fragment.jsff"/>
     </af:iterator>
    

    Change the bindings in the page fragment to match the output of the iterator

     <amis-person amis-person-name="#{item.bindings.FirstName.inputValue}" amis-person-email="#{item.bindings.Email.inputValue}" />
    

    That’s it, you are done. When I now run the project I see the data from the Employee Viewmodel in the Jet Component I made:

     

    Sources
    1. http://andrejusb.blogspot.nl/2015/12/improved-jet-rendering-in-adf.html
    2. https://blogs.oracle.com/groundside/entry/jet_composite_components_i_backgrounder (and the other blogs)
    3. ADFJetDemo Application or Github
    Versions used

    JDeveloper 12.1.3,
    OracleJet V2.2.0

    Disclaimer

    The information is based on my personal research. At the moment, Oracle does not support or encourage integrating ADF and Jet. Oracle is working on JET Composite Components in ADF.

    There is also a second part, on how to this, but then with DVT’s

    The post Getting ADF Data in a Jet Component (1) appeared first on AMIS Oracle and Java Blog.

    Getting started with Oracle PaaS Service Manager Command Line Interface (PSM)

    Mon, 2017-03-06 23:41

    Oracle PaaS Service Manager (PSM) provides a command line interface (CLI) with which you can manage the lifecycle of various services in Oracle Public Cloud. This opens the door for scripting (recurring) tasks – such as (re)deployment of applications on ACCS to provisioning new environments. PSM makes performing admin operations on the Oracle Public Cloud a lot easier and efficient, compared to using the the GUI.

    Note that the CLI is a thin wrapper over PaaS REST APIs that invokes these APIs to support common PaaS features.

    The steps for installing and configuring PSM are simple enough – and take about 6 minutes. I will briefly walk you through them. They are also documented just fine.  Before I continue, I want to thank Abhijit Ramchandra Jere of Oracle for graciously helping me out with PSM.

    1. Install Python (3.3+) and cURL

    PSM is a Python based tool. To set it up and run it, you need to have Python set up on your machine.

    2. Download PSM

    The psmcli.zip can be downloaded from Cloud UI (as described here) or it can be fetched through cURL from the REST API (as described here):

    curl -I -X GET       -u “lucas.jellema:password”      -H “X-ID-TENANT-NAME: cloud17”      -H “Accept: application/json”       https://psm.us.oraclecloud.com/paas/api/v1.1/instancemgmt/cloud17/services/cs/instances

    3. Install PSM as a Python Package

    With a simple statement, PSM is installed from the downloaded zip file (see here)

    pip install -U psmcli.zip

    image

    This installs PSM into the Python Scripts directory: image

    Verify the result using

    pip show psmcli

    image

    On Linux:

    image

     

    4. Configure PSM for the identity domain and the connection to the cloud

    Run the setup for PSM and configure it for use with your identity domain (see docs). Note: this step assumes that the Python scripts directory that contains PSM is in the PATH environment variable.

    psm setup

    image

    I am not sure if and how you can use PSM on your machine for multiple identity domains or user accounts. I have access to several Oracle Public Cloud identity domains – even in different data centers. I have now setup PSM for one of them. If I can do a setup for a second identity domain and then somehow be able to switch between the two is not yet clear to me.
    EDIT: switching to a different identity domain is simply done by running psm setup again. I need to provide the identity domain, region and credentials to make the switch. Note: psm remembers the set up across restart of the operating system.

    5. Start using PSM for inspecting and manipulating PaaS Services

    PSM can be used with many PaaS Services – not yet all – for inspecting their health, stopping and (re)starting, scaling and performing many administrative activities. See docs for all of them.

    Some examples:

    List all applications on Application Container Cloud:

    psm accs apps

    image

    List log details for a specific application on ACCS:

    psm accs log -n|–app-name name -i|–instance-name name

    psm accs log -n Artist-Enricher-API -i web.1

    and the list of log files is presented

    image

     

    6. Update PSM

    To get rid of the slightly annoying message about their being a later version of PSM available – and to get hold of the latest version, you simply type:

    psm update

    and wait for maybe 15 seconds.

    image

     

    Issues:

    I ran into an issue, caused as it turned out by having multiple Python versions on my machine. PSM got installed as Python package with version 3.5 and I was trying to run PSM with Python 3.6 as first version in my PATH environment variable. Clearly, that failed.

    The error I ran into: ModuleNotFoundError: No module named ‘opaascli’

    image

    The solution: I removed all but one Python version (3.5 because with 3.6 the installation did not go well because of missing pip) and then installed with that one version.

    Resources

    Documentation on PSM: http://docs.oracle.com/en/cloud/paas/java-cloud/pscli/abouit-paas-service-manager-command-line-interface.html

    Documentation on Oracle PaaS REST APIs: https://apicatalog.oraclecloud.com/ui/

    The post Getting started with Oracle PaaS Service Manager Command Line Interface (PSM) appeared first on AMIS Oracle and Java Blog.

    Runtime version of Node on Oracle Application Container Cloud (0.12 vs. 6.9.1) – easy single click upgrade

    Wed, 2017-03-01 23:32

    Yesterday, I create a new application on Oracle Application Container Cloud. The application type is Node (fka Node.js), so the container type I had created was Node – rather than PHP or Java SE, the other two options currently available to me. I was a little dismayed to learn that the runtime Node version that my container was created with was (still) 0.12.17. I had assumed that by now ACCS would have moved to a more recent version of Node.

    Today, after a little closer inspection, I realized that upgrading the runtime [version of Node]is actually very simple to do on ACCS. Go to the Administration tab in the application overview.

    image

    Two later versions for the Node runtime are listed – 4.6.1 and 6.9.1. Now we are talking! I can simply click on the Update button to have the runtime upgraded. I can then choose between the fast update, with some brief downtime, or the rolling upgrade that will not affect the service of my application – and take longer to complete.

    image

    I click on Restart. The UI informs me of the current action:

    image

    And I can track the in progress activity:

    image

    The overall upgrade took several minutes to complete – somewhat longer still than I had expected. However, it took me not more effort than clicking a button. And it did not impact my consumers. All in all, pretty smooth. And now I am on v6.9.1, which is pretty up to date.

    I am not sure whether during the initial creation of the application I had the option to start out with this recent version of Node, rather than the fairly old v0.12 that was now provisioned initially. If so, I missed it completely. Then it should be made more obvious. If I did not get the choice, then I believe that a missed opportunity that Oracle may want to add to this cloud service.

    The post Runtime version of Node on Oracle Application Container Cloud (0.12 vs. 6.9.1) – easy single click upgrade appeared first on AMIS Oracle and Java Blog.

    Oracle Service Bus : disable / enable a proxy service via WebLogic Server MBeans with JMX

    Tue, 2017-02-28 04:22

    At a public sector organization in the Netherlands an OSB proxy service was (via JMS) reading messages from a WebLogic queue. These messages where then send to a back-end system. Every evening during a certain time period the back-end system was down. So therefor and also in case of planned maintenance there was a requirement whereby it was necessary to be able to stop and start sending messages to the back-end system from the queue. Hence, a script was needed to disable/enable the OSB proxy service (deployed on OSB 11.1.1.7).

    This article will explain how the OSB proxy service can be disabled/enabled via WebLogic Server MBeans with JMX.

    A managed bean (MBean) is a Java object that represents a Java Management Extensions (JMX) manageable resource in a distributed environment, such as an application, a service, a component, or a device.

    First an “high over” overview of the MBeans is given. For further information see “Fusion Middleware Developing Custom Management Utilities With JMX for Oracle WebLogic Server”, via url: https://docs.oracle.com/cd/E28280_01/web.1111/e13728/toc.htm

    Next the structure and use of the System MBean Browser in the Oracle Enterprise Manager Fusion Middleware Control is discussed.

    Finally the code to disable/enable the OSB proxy service is shown.

    To disable/enable an OSB proxy service, also WebLogic Scripting Tool (WLST) can be used, but in this case (also because of my java developer skills) JMX was used. For more information have a look for example at AMIS TECHNOLOGY BLOG: “Oracle Service Bus: enable / disable proxy service with WLST”, via url: https://technology.amis.nl/2011/01/10/oracle-service-bus-enable-disable-proxy-service-with-wlst/

    The Java Management Extensions (JMX) technology is a standard part of the Java Platform, Standard Edition (Java SE platform). The JMX technology was added to the platform in the Java 2 Platform, Standard Edition (J2SE) 5.0 release.

    The JMX technology provides a simple, standard way of managing resources such as applications, devices, and services. Because the JMX technology is dynamic, you can use it to monitor and manage resources as they are created, installed and implemented. You can also use the JMX technology to monitor and manage the Java Virtual Machine (Java VM).

    For another example of using MBeans with JMX, I kindly point you to another article (written by me) on the AMIS TECHNOLOGY BLOG: “Doing performance measurements of an OSB Proxy Service by programmatically extracting performance metrics via the ServiceDomainMBean and presenting them as an image via a PowerPoint VBA module”, via url: https://technology.amis.nl/2016/01/30/performance-measurements-of-an-osb-proxy-service-by-using-the-servicedomainmbean/

    Basic Organization of a WebLogic Server Domain

    As you probably already know a WebLogic Server administration domain is a collection of one or more servers and the applications and resources that are configured to run on the servers. Each domain must include a special server instance that is designated as the Administration Server. The simplest domain contains a single server instance that acts as both Administration Server and host for applications and resources. This domain configuration is commonly used in development environments. Domains for production environments usually contain multiple server instances (Managed Servers) running independently or in groups called clusters. In such environments, the Administration Server does not host production applications.

    Separate MBean Types for Monitoring and Configuring

    All WebLogic Server MBeans can be organized into one of the following general types based on whether the MBean monitors or configures servers and resources:

    • Runtime MBeans contain information about the run-time state of a server and its resources. They generally contain only data about the current state of a server or resource, and they do not persist this data. When you shut down a server instance, all run-time statistics and metrics from the run-time MBeans are destroyed.
    • Configuration MBeans contain information about the configuration of servers and resources. They represent the information that is stored in the domain’s XML configuration documents.
    • Configuration MBeans for system modules contain information about the configuration of services such as JDBC data sources and JMS topics that have been targeted at the system level. Instead of targeting these services at the system level, you can include services as modules within an application. These application-level resources share the life cycle and scope of the parent application. However, WebLogic Server does not provide MBeans for application modules.
    MBean Servers

    At the core of any JMX agent is the MBean server, which acts as a container for MBeans.

    The JVM for an Administration Server maintains three MBean servers provided by Oracle and optionally maintains the platform MBean server, which is provided by the JDK itself. The JVM for a Managed Server maintains only one Oracle MBean server and the optional platform MBean server.

    MBean Server Creates, registers, and provides access to… Domain Runtime MBean Server MBeans for domain-wide services. This MBean server also acts as a single point of access for MBeans that reside on Managed Servers.

    Only the Administration Server hosts an instance of this MBean server. Runtime MBean Server MBeans that expose monitoring, run-time control, and the active configuration of a specific WebLogic Server instance.

    In release 11.1.1.7, the WebLogic Server Runtime MBean Server is configured by default to be the platform MBean server.

    Each server in the domain hosts an instance of this MBean server. Edit MBean Server Pending configuration MBeans and operations that control the configuration of a WebLogic Server domain. It exposes a ConfigurationManagerMBean for locking, saving, and activating changes.

    Only the Administration Server hosts an instance of this MBean server. The JVM’s platform MBean server MBeans provided by the JDK that contain monitoring information for the JVM itself. You can register custom MBeans in this MBean server.

    In release 11.1.1.7, WebLogic Server uses the JVM’s platform MBean server to contain the WebLogic run-time MBeans by default. Service MBeans

    Within each MBean server, WebLogic Server registers a service MBean under a simple object name. The attributes and operations in this MBean serve as your entry point into the WebLogic Server MBean hierarchies and enable JMX clients to navigate to all WebLogic Server MBeans in an MBean server after supplying only a single object name.

    MBean Server Service MBean JMX object name The Domain Runtime MBean Server DomainRuntimeServiceMBean

    Provides access to MBeans for domain-wide services such as application deployment, JMS servers, and JDBC data sources. It also is a single point for accessing the hierarchies of all run-time MBeans and all active configuration MBeans for all servers in the domain. com.bea:Name=DomainRuntimeService,Type=weblogic.management.mbeanservers.domainruntime.DomainRuntimeServiceMBean Runtime MBean Servers RuntimeServiceMBean

    Provides access to run-time MBeans and active configuration MBeans for the current server. com.bea:Name=RuntimeService,Type=weblogic.management.mbeanservers.runtime.RuntimeServiceMBean The Edit MBean Server EditServiceMBean

    Provides the entry point for managing the configuration of the current WebLogic Server domain. com.bea:Name=EditService,Type=weblogic.management.mbeanservers.edit.EditServiceMBean Choosing an MBean Server

    If your client monitors run-time MBeans for multiple servers, or if your client runs in a separate JVM, Oracle recommends that you connect to the Domain Runtime MBean Server on the Administration Server instead of connecting separately to each Runtime MBean Server on each server instance in the domain.

    The trade off for directing all JMX requests through the Domain Runtime MBean Server is a slight degradation in performance due to network latency and increased memory usage. However, for most network topologies and performance requirements, the simplified code maintenance and enhanced security that the Domain Runtime MBean Server enables is preferable.

    System MBean Browser

    Oracle Enterprise Manager Fusion Middleware Control provides the System MBean Browser for managing MBeans that perform specific monitoring and configuration tasks.

    Via the Oracle Enterprise Manager Fusion Middleware Control for a certain domain, the System MBean Browser can be opened.

    Here the previously mentioned types of MBean’s can be seen: Runtime MBeans and Configuration MBeans:

    When navigating to “Configuration MBeans | com.bea”, the previously mentioned EditServiceMBean can be found:

    When navigating to “Runtime MBeans | com.bea | Domain: <a domain>”, the previously mentioned DomainRuntimeServiceMBean can be found:

    Also the later on in this article mentioned MBeans can be found:

    For example for the ProxyServiceConfigurationMbean, the available operations can be found:

    When navigating to “Runtime MBeans | com.bea”, within each Server the previously mentioned RuntimeServiceMBean can be found.

     

    Code to disable/enable the OSB proxy service

    The requirement to be able to stop and start sending messages to the back-end system from the queue was implemented by disabling/enabling the state of the OSB Proxy service JMSConsumerStuFZKNMessageService_PS.

    Short before the back-end system goes down, dequeuing of the queue should be disabled.
    Right after the back-end system goes up again, dequeuing of the queue should be enabled.

    The state of the OSB Proxy service can be seen in the Oracle Service Bus Administration 11g Console (for example via the Project Explorer) in the tab “Operational Settings” of the proxy service.

    For ease of use, two ms-dos batch files where created, each using MBeans, to change the state of a service (proxy service or business service). As stated before, the WebLogic Server contains a set of MBeans that can be used to configure, monitor and manage WebLogic Server resources.

    • Disable_JMSConsumerStuFZKNMessageService_PS.bat

    On the server where the back-end system resides, the ms-dos batch file “Disable_JMSConsumerStuFZKNMessageService_PS.bat” is called.

    The content of the batch file is:

    java.exe -classpath “OSBServiceState.jar;com.bea.common.configfwk_1.7.0.0.jar;sb-kernel-api.jar;sb-kernel-impl.jar;wlfullclient.jar” nl.xyz.osbservice.osbservicestate.OSBServiceState “xyz” “7001” “weblogic” “xyz” “ProxyService” “JMSConsumerStuFZKNMessageService-1.0/proxy/JMSConsumerStuFZKNMessageService_PS” “Disable”

    • Enable_JMSConsumerStuFZKNMessageService_PS.bat

    On the server where the back-end system resides, the ms-dos batch file “Enable_JMSConsumerStuFZKNMessageService_PS.bat” is called.

    The content of the batch file is:

    java.exe -classpath “OSBServiceState.jar;com.bea.common.configfwk_1.7.0.0.jar;sb-kernel-api.jar;sb-kernel-impl.jar;wlfullclient.jar” nl.xyz.osbservice.osbservicestate.OSBServiceState “xyz” “7001” “weblogic” “xyz” “ProxyService” “JMSConsumerStuFZKNMessageService-1.0/proxy/JMSConsumerStuFZKNMessageService_PS” “Enable”

    In both ms-dos batch files via java.exe a class named OSBServiceState is being called. The main method of this class expects the following parameters:

    Parameter name Description HOSTNAME Host name of the AdminServer PORT Port of the AdminServer USERNAME Username PASSWORD Passsword SERVICETYPE Type of resource. Possible values are:

    • ProxyService
    • BusinessService
    SERVICEURI Identifier of the resource. The name begins with the project name, followed by folder names and ending with the resource name. ACTION The action to be carried out. Possible values are:

    • Enable
    • Disable

    Every change is carried out in it´s own session (via the SessionManagementMBean), which is automatically activated with description: OSBServiceState_script_<systemdatetime>

    This can be seen via the Change Center | View Changes of the Oracle Service Bus Administration 11g Console:

    The response from “Disable_JMSConsumerStuFZKNMessageService_PS.bat” is:

    Disabling service JMSConsumerStuFZKNMessageService-1.0/proxy/JMSConsumerStuFZKNMessageService_PS has been succesfully completed

    In the Oracle Service Bus Administration 11g Console this change can be found as a Task:

    The result of changing the state of the OSB Proxy service can be checked in the Oracle Service Bus Administration 11g Console.

    The same applies when using “Enable_JMSConsumerStuFZKNMessageService_PS.bat”.

    In the sample code below the use of the following MBeans can be seen:

    Provides a common access point for navigating to all runtime and configuration MBeans in the domain as well as to MBeans that provide domain-wide services (such as controlling and monitoring the life cycles of servers and message-driven EJBs and coordinating the migration of migratable services). [https://docs.oracle.com/middleware/1213/wls/WLAPI/weblogic/management/mbeanservers/domainruntime/DomainRuntimeServiceMBean.html]

    This library is not by default provided in a WebLogic install and must be build. The simple way of how to do this is described in
    “Fusion Middleware Programming Stand-alone Clients for Oracle WebLogic Server, Using the WebLogic JarBuilder Tool”, which can be reached via url: https://docs.oracle.com/cd/E28280_01/web.1111/e13717/jarbuilder.htm#SACLT240.

    Provides API to create, activate or discard sessions. [http://docs.oracle.com/cd/E13171_01/alsb/docs26/javadoc/com/bea/wli/sb/management/configuration/SessionManagementMBean.html]

    Provides API to enable/disable services and enable/disable monitoring for a proxy service. [https://docs.oracle.com/cd/E13171_01/alsb/docs26/javadoc/com/bea/wli/sb/management/configuration/ProxyServiceConfigurationMBean.html]

    Provides API for managing business services. [https://docs.oracle.com/cd/E13171_01/alsb/docs25/javadoc/com/bea/wli/sb/management/configuration/BusinessServiceConfigurationMBean.html]

    Once the connection to the DomainRuntimeServiceMBean is made, other MBeans can be found via the findService method.

    Service findService(String name,
                        String type,
                        String location)
    

    This method returns the Service on the specified Server or in the primary MBeanServer if the location is not specified.

    In the code example below certain java fields are used. For reading purposes the field values are shown in the following table:

    Field Field value DomainRuntimeServiceMBean.MBEANSERVER_JNDI_NAME weblogic.management.mbeanservers.domainruntime DomainRuntimeServiceMBean.OBJECT_NAME com.bea:Name=DomainRuntimeService,Type=weblogic.management.mbeanservers.domainruntime.DomainRuntimeServiceMBean SessionManagementMBean.NAME SessionManagement SessionManagementMBean.TYPE com.bea.wli.sb.management.configuration.SessionManagementMBean ProxyServiceConfigurationMBean.NAME ProxyServiceConfiguration ProxyServiceConfigurationMBean.TYPE com.bea.wli.sb.management.configuration.ProxyServiceConfigurationMBean BusinessServiceConfigurationMBean.NAME BusinessServiceConfiguration BusinessServiceConfigurationMBean.TYPE com.bea.wli.sb.management.configuration.BusinessServiceConfigurationMBean

    Because of the use of com.bea.wli.config.Ref.class , the following library <Middleware Home Directory>/Oracle_OSB1/modules/com.bea.common.configfwk_1.7.0.0.jar was needed.

    Because of the use of weblogic.management.jmx.MBeanServerInvocationHandler.class , the following library <Middleware Home Directory>/wlserver_10.3/server/lib/wlfullclient.jar was needed.

    When running the code the following error was thrown:

    java.lang.RuntimeException: java.lang.ClassNotFoundException: com.bea.wli.sb.management.configuration.DelegatedSessionManagementMBean
    	at weblogic.management.jmx.MBeanServerInvocationHandler.newProxyInstance(MBeanServerInvocationHandler.java:621)
    	at weblogic.management.jmx.MBeanServerInvocationHandler.invoke(MBeanServerInvocationHandler.java:418)
    	at $Proxy0.findService(Unknown Source)
    	at nl.xyz.osbservice.osbservicestate.OSBServiceState.<init>(OSBServiceState.java:66)
    	at nl.xyz.osbservice.osbservicestate.OSBServiceState.main(OSBServiceState.java:217)
    Caused by: java.lang.ClassNotFoundException: com.bea.wli.sb.management.configuration.DelegatedSessionManagementMBean
    	at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
    	at java.security.AccessController.doPrivileged(Native Method)
    	at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
    	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
    	at weblogic.management.jmx.MBeanServerInvocationHandler.newProxyInstance(MBeanServerInvocationHandler.java:619)
    	... 4 more
    Process exited.
    

    So because of the use of com.bea.wli.sb.management.configuration.DelegatedSessionManagementMBean.class the following library <Middleware Home Directory>/Oracle_OSB1/lib/sb-kernel-impl.jar was also needed.

    package nl.xyz.osbservice.osbservicestate;
    
    
    import com.bea.wli.config.Ref;
    import com.bea.wli.sb.management.configuration.BusinessServiceConfigurationMBean;
    import com.bea.wli.sb.management.configuration.ProxyServiceConfigurationMBean;
    import com.bea.wli.sb.management.configuration.SessionManagementMBean;
    
    import java.io.IOException;
    
    import java.net.MalformedURLException;
    
    import java.util.HashMap;
    import java.util.Hashtable;
    import java.util.Properties;
    
    import javax.management.MBeanServerConnection;
    import javax.management.MalformedObjectNameException;
    import javax.management.ObjectName;
    import javax.management.remote.JMXConnector;
    import javax.management.remote.JMXConnectorFactory;
    import javax.management.remote.JMXServiceURL;
    
    import javax.naming.Context;
    
    import weblogic.management.jmx.MBeanServerInvocationHandler;
    import weblogic.management.mbeanservers.domainruntime.DomainRuntimeServiceMBean;
    
    
    public class OSBServiceState {
        private static MBeanServerConnection connection;
        private static JMXConnector connector;
    
        public OSBServiceState(HashMap props) {
            super();
            SessionManagementMBean sessionManagementMBean = null;
            String sessionName =
                "OSBServiceState_script_" + System.currentTimeMillis();
            String servicetype;
            String serviceURI;
            String action;
            String description = "";
    
    
            try {
    
                Properties properties = new Properties();
                properties.putAll(props);
    
                initConnection(properties.getProperty("HOSTNAME"),
                               properties.getProperty("PORT"),
                               properties.getProperty("USERNAME"),
                               properties.getProperty("PASSWORD"));
    
                servicetype = properties.getProperty("SERVICETYPE");
                serviceURI = properties.getProperty("SERVICEURI");
                action = properties.getProperty("ACTION");
    
                DomainRuntimeServiceMBean domainRuntimeServiceMBean =
                    (DomainRuntimeServiceMBean)findDomainRuntimeServiceMBean(connection);
    
                // Create a session via SessionManagementMBean.
                sessionManagementMBean =
                        (SessionManagementMBean)domainRuntimeServiceMBean.findService(SessionManagementMBean.NAME,
                                                                                      SessionManagementMBean.TYPE,
                                                                                      null);
                sessionManagementMBean.createSession(sessionName);
    
                if (servicetype.equalsIgnoreCase("ProxyService")) {
    
                    // A Ref uniquely represents a resource, project or folder that is managed by the Configuration Framework.
                    // A Ref object has two components: A typeId that indicates whether it is a project, folder, or a resource, and an array of names of non-zero length.
                    // For a resource the array of names start with the project name, followed by folder names, and end with the resource name.
                    // For a project, the Ref object simply contains one name component, that is, the project name.
                    // A Ref object for a folder contains the project name followed by the names of the folders which it is nested under.
                    Ref ref = constructRef("ProxyService", serviceURI);
    
                    ProxyServiceConfigurationMBean proxyServiceConfigurationMBean =
                        (ProxyServiceConfigurationMBean)domainRuntimeServiceMBean.findService(ProxyServiceConfigurationMBean.NAME +
                                                                                              "." +
                                                                                              sessionName,
                                                                                              ProxyServiceConfigurationMBean.TYPE,
                                                                                              null);
                    if (action.equalsIgnoreCase("Enable")) {
                        proxyServiceConfigurationMBean.enableService(ref);
                        description = "Enabled the service: " + serviceURI;
                        System.out.print("Enabling service " + serviceURI);
                    } else if (action.equalsIgnoreCase("Disable")) {
                        proxyServiceConfigurationMBean.disableService(ref);
                        description = "Disabled the service: " + serviceURI;
                        System.out.print("Disabling service " + serviceURI);
                    } else {
                        System.out.println("Unsupported value for ACTION");
                    }
                } else if (servicetype.equals("BusinessService")) {
                    Ref ref = constructRef("BusinessService", serviceURI);
    
                    BusinessServiceConfigurationMBean businessServiceConfigurationMBean =
                        (BusinessServiceConfigurationMBean)domainRuntimeServiceMBean.findService(BusinessServiceConfigurationMBean.NAME +
                                                                                                 "." +
                                                                                                 sessionName,
                                                                                                 BusinessServiceConfigurationMBean.TYPE,
                                                                                                 null);
                    if (action.equalsIgnoreCase("Enable")) {
                        businessServiceConfigurationMBean.enableService(ref);
                        description = "Enabled the service: " + serviceURI;
                        System.out.print("Enabling service " + serviceURI);
                    } else if (action.equalsIgnoreCase("Disable")) {
                        businessServiceConfigurationMBean.disableService(ref);
                        description = "Disabled the service: " + serviceURI;
                        System.out.print("Disabling service " + serviceURI);
                    } else {
                        System.out.println("Unsupported value for ACTION");
                    }
                }
                sessionManagementMBean.activateSession(sessionName, description);
                System.out.println(" has been succesfully completed");
            } catch (Exception ex) {
                if (sessionManagementMBean != null) {
                    try {
                       sessionManagementMBean.discardSession(sessionName);
                        System.out.println(" resulted in an error.");
                    } catch (Exception e) {
                        System.out.println("Unable to discard session: " +
                                           sessionName);
                    }
                }
    
                ex.printStackTrace();
            } finally {
                if (connector != null)
                    try {
                        connector.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
            }
        }
    
    
        /*
           * Initialize connection to the Domain Runtime MBean Server.
           */
    
        public static void initConnection(String hostname, String portString,
                                          String username,
                                          String password) throws IOException,
                                                                  MalformedURLException {
    
            String protocol = "t3";
            Integer portInteger = Integer.valueOf(portString);
            int port = portInteger.intValue();
            String jndiroot = "/jndi/";
            String mbeanserver = DomainRuntimeServiceMBean.MBEANSERVER_JNDI_NAME;
    
            JMXServiceURL serviceURL =
                new JMXServiceURL(protocol, hostname, port, jndiroot +
                                  mbeanserver);
    
            Hashtable hashtable = new Hashtable();
            hashtable.put(Context.SECURITY_PRINCIPAL, username);
            hashtable.put(Context.SECURITY_CREDENTIALS, password);
            hashtable.put(JMXConnectorFactory.PROTOCOL_PROVIDER_PACKAGES,
                          "weblogic.management.remote");
            hashtable.put("jmx.remote.x.request.waiting.timeout", new Long(10000));
    
            connector = JMXConnectorFactory.connect(serviceURL, hashtable);
            connection = connector.getMBeanServerConnection();
        }
    
    
        private static Ref constructRef(String refType, String serviceURI) {
            Ref ref = null;
            String[] uriData = serviceURI.split("/");
            ref = new Ref(refType, uriData);
            return ref;
        }
    
    
        /**
         * Finds the specified MBean object
         *
         * @param connection - A connection to the MBeanServer.
         * @return Object - The MBean or null if the MBean was not found.
         */
        public Object findDomainRuntimeServiceMBean(MBeanServerConnection connection) {
            try {
                ObjectName objectName =
                    new ObjectName(DomainRuntimeServiceMBean.OBJECT_NAME);
                return (DomainRuntimeServiceMBean)MBeanServerInvocationHandler.newProxyInstance(connection,
                                                                                                objectName);
            } catch (MalformedObjectNameException e) {
                e.printStackTrace();
                return null;
            }
        }
    
    
        public static void main(String[] args) {
            try {
                if (args.length <= 0) {
                    System.out.println("Provide values for the following parameters: HOSTNAME, PORT, USERNAME, PASSWORD, SERVICETYPE, SERVICEURI, ACTION.);
    
                } else {
                    HashMap<String, String> map = new HashMap<String, String>();
    
                    map.put("HOSTNAME", args[0]);
                    map.put("PORT", args[1]);
                    map.put("USERNAME", args[2]);
                    map.put("PASSWORD", args[3]);
                    map.put("SERVICETYPE", args[4]);
                    map.put("SERVICEURI", args[5]);
                    map.put("ACTION", args[6]);
                    OSBServiceState osbServiceState = new OSBServiceState(map);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    }
    
    

    The post Oracle Service Bus : disable / enable a proxy service via WebLogic Server MBeans with JMX appeared first on AMIS Oracle and Java Blog.

    Dump Oracle data into a delimited ascii file with PL/SQL

    Fri, 2017-02-24 08:30

    This is how I dump data from an Oracle Database (tested on 8i,9i,10g,11g,12c) to a delimited ascii file:

    SQL*Plus: Release 12.1.0.2.0 Production on Fri Feb 24 13:55:47 2017
    Copyright (c) 1982, 2014, Oracle.  All rights reserved.
    
    Connected to:
    Oracle Database 12c Standard Edition Release 12.1.0.2.0 - 64bit Production
    
    SQL> set timing on
    SQL> select Dump_Delimited('select * from all_objects', 'all_objects.csv') nr_rows from dual;
    
       NR_ROWS
    ----------
         97116
    
    Elapsed: 00:00:11.87
    SQL> ! cat /u01/etl/report/all_objects_readme.txt
    
    
      *********************************************************************  
      Record Layout of file /u01/etl/report/all_objects.csv
      *********************************************************************  
    
    
      Column                          Sequence  MaxLength  Datatype  
      ------------------------------  --------  ---------  ----------  
    
      OWNER                           1         128        VARCHAR2                 
      OBJECT_NAME                     2         128        VARCHAR2                 
      SUBOBJECT_NAME                  3         128        VARCHAR2                 
      OBJECT_ID                       4         24         NUMBER                   
      DATA_OBJECT_ID                  5         24         NUMBER                   
      OBJECT_TYPE                     6         23         VARCHAR2                 
      CREATED                         7         20         DATE                     
      LAST_DDL_TIME                   8         20         DATE                     
      TIMESTAMP                       9         19         VARCHAR2                 
      STATUS                          10        7          VARCHAR2                 
      TEMPORARY                       11        1          VARCHAR2                 
      GENERATED                       12        1          VARCHAR2                 
      SECONDARY                       13        1          VARCHAR2                 
      NAMESPACE                       14        24         NUMBER                   
      EDITION_NAME                    15        128        VARCHAR2                 
      SHARING                         16        13         VARCHAR2                 
      EDITIONABLE                     17        1          VARCHAR2                 
      ORACLE_MAINTAINED               18        1          VARCHAR2                 
    
    
      ----------------------------------  
      Generated:     24-02-2017 13:56:50
      Generated by:  ETL
      Columns Count: 18
      Records Count: 97116
      Delimiter: ][
      Row Delimiter: ]
      ----------------------------------  
    
    SQL> 
    

    Next to the query and the generated filename the Dump_Delimited function takes another 6 parameters, each one with a default value. Check out the PL/SQL, and BTW… the basics for this code comes from Tom Kyte.

    SET DEFINE OFF;
    CREATE OR REPLACE DIRECTORY ETL_UNLOAD_DIR AS '/u01/etl/report';
    GRANT READ, WRITE ON DIRECTORY ETL_UNLOAD_DIR TO ETL;
    
    CREATE OR REPLACE FUNCTION Dump_Delimited
       ( P_query                IN VARCHAR2
       , P_filename             IN VARCHAR2
       , P_column_delimiter     IN VARCHAR2    := ']['
       , P_row_delimiter        IN VARCHAR2    := ']'
       , P_comment              IN VARCHAR2    := NULL
       , P_write_rec_layout     IN PLS_INTEGER := 1
       , P_dir                  IN VARCHAR2    := 'ETL_UNLOAD_DIR'
       , P_nr_is_pos_integer    IN PLS_INTEGER := 0 )
    RETURN PLS_INTEGER
     IS
        filehandle             UTL_FILE.FILE_TYPE;
        filehandle_rc          UTL_FILE.FILE_TYPE;
    
        v_user_name            VARCHAR2(100);
        v_file_name_full       VARCHAR2(200);
        v_dir                  VARCHAR2(200);
        v_total_length         PLS_INTEGER := 0;
        v_startpos             PLS_INTEGER := 0;
        v_datatype             VARCHAR2(30);
        v_delimiter            VARCHAR2(10):= P_column_delimiter;
        v_rowdelimiter         VARCHAR2(10):= P_row_delimiter;
    
        v_cursorid             PLS_INTEGER := DBMS_SQL.OPEN_CURSOR;
        v_columnvalue          VARCHAR2(4000);
        v_ignore               PLS_INTEGER;
        v_colcount             PLS_INTEGER := 0;
        v_newline              VARCHAR2(32676);
        v_desc_cols_table      DBMS_SQL.DESC_TAB;
        v_dateformat           NLS_SESSION_PARAMETERS.VALUE%TYPE;
        v_stat                 VARCHAR2(1000);
        counter                PLS_INTEGER := 0;
    BEGIN
    
        SELECT directory_path
          INTO v_dir 
        FROM DBA_DIRECTORIES
        WHERE directory_name = P_dir;
        v_file_name_full  := v_dir||'/'||P_filename;
    
        SELECT VALUE
          INTO v_dateformat
        FROM NLS_SESSION_PARAMETERS
        WHERE parameter = 'NLS_DATE_FORMAT';
    
        /* Use a date format that includes the time. */
        v_stat := 'alter session set nls_date_format=''dd-mm-yyyy hh24:mi:ss'' ';
        EXECUTE IMMEDIATE v_stat;
    
        filehandle := UTL_FILE.FOPEN( P_dir, P_filename, 'w', 32000 );
    
        /* Parse the input query so we can describe it. */
        DBMS_SQL.PARSE(  v_cursorid,  P_query, dbms_sql.native );
    
        /* Now, describe the outputs of the query. */
        DBMS_SQL.DESCRIBE_COLUMNS( v_cursorid, v_colcount, v_desc_cols_table );
    
        /* For each column, we need to define it, to tell the database
         * what we will fetch into. In this case, all data is going
         * to be fetched into a single varchar2(4000) variable.
         *
         * We will also adjust the max width of each column. 
         */
    IF P_write_rec_layout = 1 THEN
    
       filehandle_rc := UTL_FILE.FOPEN(P_dir, SUBSTR(P_filename,1, INSTR(P_filename,'.',-1)-1)||'_readme.txt', 'w');
    
    --Start Header
        v_newline := CHR(10)||CHR(10)||'  *********************************************************************  ';
          UTL_FILE.PUT_LINE(filehandle_rc, v_newline);
           v_newline := '  Record Layout of file '||v_file_name_full;
          UTL_FILE.PUT_LINE(filehandle_rc, v_newline);
        v_newline := '  *********************************************************************  '||CHR(10)||CHR(10);
          UTL_FILE.PUT_LINE(filehandle_rc, v_newline);
        v_newline := '  Column                          Sequence  MaxLength  Datatype  ';
          UTL_FILE.PUT_LINE(filehandle_rc, v_newline);
        v_newline := '  ------------------------------  --------  ---------  ----------  '||CHR(10);
          UTL_FILE.PUT_LINE(filehandle_rc, v_newline);
    --End Header
    
    --Start Body
        FOR i IN 1 .. v_colcount
        LOOP
           DBMS_SQL.DEFINE_COLUMN( v_cursorid, i, v_columnvalue, 4000 );
           SELECT DECODE( v_desc_cols_table(i).col_type,  2, DECODE(v_desc_cols_table(i).col_precision,0,v_desc_cols_table(i).col_max_len,v_desc_cols_table(i).col_precision)+DECODE(P_nr_is_pos_integer,1,0,2)
                                                       , 12, 20, v_desc_cols_table(i).col_max_len )
             INTO v_desc_cols_table(i).col_max_len
           FROM dual;
    
           SELECT DECODE( TO_CHAR(v_desc_cols_table(i).col_type), '1'  , 'VARCHAR2'
                                                                , '2'  , 'NUMBER'
                                                                , '8'  , 'LONG'
                                                                , '11' , 'ROWID'
                                                                , '12' , 'DATE'
                                                                , '96' , 'CHAR'
                                                                , '108', 'USER_DEFINED_TYPE', TO_CHAR(v_desc_cols_table(i).col_type) )
             INTO v_datatype
           FROM DUAL;
    
           v_newline := RPAD('  '||v_desc_cols_table(i).col_name,34)||RPAD(i,10)||RPAD(v_desc_cols_table(i).col_max_len,11)||RPAD(v_datatype,25);
        UTL_FILE.PUT_LINE(filehandle_rc, v_newline);
        END LOOP;
    --End Body
    
    ELSE
    
        FOR i IN 1 .. v_colcount LOOP
           DBMS_SQL.DEFINE_COLUMN( v_cursorid, i, v_columnvalue, 4000 );
           SELECT DECODE( v_desc_cols_table(i).col_type,  2, DECODE(v_desc_cols_table(i).col_precision,0,v_desc_cols_table(i).col_max_len,v_desc_cols_table(i).col_precision)+DECODE(P_nr_is_pos_integer,1,0,2)
                                                       , 12, 20, v_desc_cols_table(i).col_max_len )
             INTO v_desc_cols_table(i).col_max_len
           FROM dual;
         END LOOP;
    
    END IF;
    
        v_ignore := DBMS_SQL.EXECUTE(v_cursorid);
    
         WHILE ( DBMS_SQL.FETCH_ROWS(v_cursorid) > 0 )
         LOOP
            /* Build up a big output line. This is more efficient than
             * calling UTL_FILE.PUT inside the loop.
             */
            v_newline := NULL;
            FOR i IN 1 .. v_colcount LOOP
                DBMS_SQL.COLUMN_VALUE( v_cursorid, i, v_columnvalue );
                if i = 1 then
                  v_newline := v_newline||v_columnvalue;
                else
                  v_newline := v_newline||v_delimiter||v_columnvalue;
                end if;              
            END LOOP;
    
            /* Now print out that line and increment a counter. */
            UTL_FILE.PUT_LINE( filehandle, v_newline||v_rowdelimiter );
            counter := counter+1;
        END LOOP;
    
    IF P_write_rec_layout = 1 THEN
    
    --Start Footer
        v_newline := CHR(10)||CHR(10)||'  ----------------------------------  ';
          UTL_FILE.PUT_LINE(filehandle_rc, v_newline);
           v_newline := '  Generated:     '||SYSDATE;
          UTL_FILE.PUT_LINE(filehandle_rc, v_newline);
           v_newline := '  Generated by:  '||USER;
          UTL_FILE.PUT_LINE(filehandle_rc, v_newline);
           v_newline := '  Columns Count: '||v_colcount;
          UTL_FILE.PUT_LINE(filehandle_rc, v_newline);
           v_newline := '  Records Count: '||counter;
          UTL_FILE.PUT_LINE(filehandle_rc, v_newline);
           v_newline := '  Delimiter: '||v_delimiter;
          UTL_FILE.PUT_LINE(filehandle_rc, v_newline);
           v_newline := '  Row Delimiter: '||v_rowdelimiter;
          UTL_FILE.PUT_LINE(filehandle_rc, v_newline);
        v_newline := '  ----------------------------------  '||CHR(10)||CHR(10);
          UTL_FILE.PUT_LINE(filehandle_rc, v_newline);
    --End Footer
    
    --Start Commment
        v_newline := '  '||P_comment;
          UTL_FILE.PUT_LINE(filehandle_rc, v_newline);
    --End Commment
    
    UTL_FILE.FCLOSE(filehandle_rc);
    
    END IF;
    
        /* Free up resources. */
        DBMS_SQL.CLOSE_CURSOR(v_cursorid);
        UTL_FILE.FCLOSE( filehandle );
    
        /* Reset the date format ... and return. */
        v_stat := 'alter session set nls_date_format=''' || v_dateformat || ''' ';
        EXECUTE IMMEDIATE v_stat;
    
        RETURN counter;
    EXCEPTION
        WHEN OTHERS THEN
            DBMS_SQL.CLOSE_CURSOR( v_cursorid );
            EXECUTE IMMEDIATE 'alter session set nls_date_format=''' || v_dateformat || ''' ';
            RETURN counter;
    
    END Dump_Delimited;
    /
    
    SHOW ERRORS;
    

    The post Dump Oracle data into a delimited ascii file with PL/SQL appeared first on AMIS Oracle and Java Blog.

    DIY Parallelization with Oracle DBMS_DATAPUMP

    Thu, 2017-02-23 12:17

    Oracle dbms_datapump provides a parallel option for exports and imports, but some objects cannot be processed in this mode. In a migration project from AIX 11gR2 to ODA X5-2 ( OL 5.9 ) 12c that included an initial load for Golden Gate, I had to deal with one of those objects, a 600G table with LOB fields, stored in the database as Basic Files ( = traditional LOB storage ).

    By applying some DIY parallelization I was able to bring the export time back from 14 hours to 35 minutes.
    Instrumental in this solution is the handy “detach” feature in the dbms_datapump package, and the use of dbms_rowid to “split” the table data in same sized chunks. The first allowed me to just define and start datapump jobs without having to wait till each one is finished, the second results in all jobs to end within just a short time of each other.

    The following PL/SQL exports tables in 32 chunks with 32 concurrent datapump jobs. Feel free to adjust this “dop” and
    schema as well as table names. Just one parameter is provided… it makes the export procedure as a whole wait
    for the end of all exports, so some other action may start automatically ( e.g a file transfer ).

    CREATE OR REPLACE PACKAGE Datapump_Parallel_Exp_Pck                                                                                                                                                                 
      IS                                                                                                                                                                                                  
        g_parallel   CONSTANT NUMBER       := 32;                                                                                                                               
        g_dmp_dir    CONSTANT VARCHAR2(25) := 'DATA_PUMP_DIR';                                                                                                                          
                                                                                                                                                                                                            
    ------------------------------------------------------------------------------------------------- 
    PROCEDURE Exec_Export
       ( P_wait IN PLS_INTEGER := 0 );                                                                                                                                                                   
                                                                                                                                                                                                            
    --------------------------------------------------------------------------------------------------                                                                                                      
    END Datapump_Parallel_Exp_Pck;
    /
    
    SHOW ERRORS;
    
    
    CREATE OR REPLACE PACKAGE BODY Datapump_Parallel_Exp_Pck                                                                                                                                                            
      IS                                                                                                                                                                                                    
                                                                                                                                                                                                            
    -------------------------------------------------------------------------------------------------                                                                                                       
    PROCEDURE Sleep                                                                                                                                                                                         
      (P_millisesconds IN NUMBER)                                                                                                                                                                           
     AS LANGUAGE JAVA                                                                                                                                                                                       
        NAME 'java.lang.Thread.sleep(int)';                                                                                                                                                                 
                                                                                                                                                                                                            
    -------------------------------------------------------------------------------------------------                                                                                                       
    FUNCTION Get_Current_Scn                                                                                                                                                                                
      RETURN NUMBER                                                                                                                                                                                         
        IS                                                                                                                                                                                                  
        v_ret NUMBER := 0;                                                                                                                                                                                  
    BEGIN                                                                                                                                                                                                   
                                                                                                                                                                                                            
      SELECT current_scn                                                                                                                                                                                    
        INTO v_ret                                                                                                                                                                                          
      FROM v$database;                                                                                                                                                                                      
                                                                                                                                                                                                            
      RETURN v_ret;                                                                                                                                                                                         
                                                                                                                                                                                                            
      EXCEPTION                                                                                                                                                                                             
        WHEN OTHERS THEN                                                                                                                                                                                    
       RAISE_APPLICATION_ERROR( -20010, SQLERRM||' - '||DBMS_UTILITY.FORMAT_ERROR_BACKTRACE );                                                                                                              
    END Get_Current_Scn;                                                                                                                                                                                    
                                                                                                                                                                                                            
    -------------------------------------------------------------------------------------------------                                                                                                       
    PROCEDURE Exp_Tables_Parallel                                                                                                                                                                   
      ( P_scn  IN NUMBER                                                                                                                                                                                    
      , P_dmp OUT VARCHAR2 )                                                                                                                                                                                
     IS                                                                                                                                                                                                     
       h1                  NUMBER(10);                                                                                                                                                                      
       v_dop               NUMBER := g_parallel;                                                                                                                                                            
       v_curr_scn          NUMBER := P_scn;                                                                                                                                                                 
       v_job_name_org      VARCHAR2(30)  := 'PX_'||TO_CHAR(sysdate,'YYYYMMDDHH24MISS');    -- PX: Parallel Execution                                                                                     
       v_job_name          VARCHAR2(30)  := v_job_name_org;                                                                                                                                                 
       v_dmp_file_name_org VARCHAR2(100) := lower(v_job_name||'.dmp');                                                                                                                                      
       v_dmp_file_name     VARCHAR2(100) := v_dmp_file_name_org;                                                                                                                                            
       v_log_file_name_org VARCHAR2(100) := lower(v_job_name||'.log');                                                                                                                                      
       v_log_file_name     VARCHAR2(100) := v_log_file_name_org;                                                                                                                                            
                                                                                                                                                                                                            
    BEGIN                                                                                                                                                                                                   
                                                                                                                                                                                                            
    -- drop master table for "orphaned job" if it exists                                                                                                                                                       
       for i in ( select 'DROP TABLE '||owner_name||'.'||job_name||' PURGE' stat                                                                                                                            
                  from dba_datapump_jobs                                                                                                                                                                    
                  where owner_name = USER                                                                                                                                                                   
                    and instr(v_job_name, upper(job_name) ) > 0                                                                                                                                             
                    and state = 'NOT RUNNING'                                                                                                                                                               
                    and attached_sessions = 0 )                                                                                                                                                             
       loop                                                                                                                                                                                                 
         execute immediate i.stat;                                                                                                                                                                          
       end loop;                                                                                                                                                                                            
                                                                                                                                                                                                            
    -- set out parameter                                                                                                                                                                                    
      P_dmp := v_dmp_file_name;                                                                                                                                                                             
                                                                                                                                                                                                            
    -- start jobs in parallel                                                                                                                                                                               
      DBMS_OUTPUT.PUT_LINE('**** START SETTING DATAPUMP PARALLEL_TABLE_EXPORT JOBS ****' );                                                                                                                 
      for counter in 0 .. v_dop-1                                                                                                                                                                           
      loop                                                                                                                                                                                                  
        v_job_name      := v_job_name_org||'_'||lpad(counter+1,3,0);                                                                                                                                        
        v_dmp_file_name := v_dmp_file_name_org||'_'||lpad(counter+1,3,0);                                                                                                                                   
        v_log_file_name := v_log_file_name_org||'_'||lpad(counter+1,3,0);                                                                                                                                   
                                                                                                                                                                                                            
        h1 := dbms_datapump.open                                                                                                                                                                            
          ( operation => 'EXPORT'                                                                                                                                                                           
          , job_mode  => 'SCHEMA'                                                                                                                                                                           
          , job_name  => v_job_name                                                                                                                                                                         
          , version   => 'LATEST');                                                                                                                                                                         
       DBMS_OUTPUT.PUT_LINE( 'Successfully opened job: '||v_job_name);                                                                                                                                     
                                                                                                                                                                                                            
         dbms_datapump.set_parallel(handle  => h1, degree => 1);                                                                                                                                            
         dbms_datapump.set_parameter(handle => h1, name  => 'KEEP_MASTER', value => 0);                                                                                                                     
         dbms_datapump.set_parameter(handle => h1, name  => 'ESTIMATE', value => 'BLOCKS');                                                                                                                 
         dbms_datapump.set_parameter(handle => h1, name  => 'INCLUDE_METADATA', value => 0);                                                                                                                
         dbms_datapump.set_parameter(handle => h1, name  => 'METRICS', value => 1);                                                                                                                         
         dbms_datapump.set_parameter(handle => h1, name  => 'FLASHBACK_SCN', value => v_curr_scn);                                                                                                          
       DBMS_OUTPUT.PUT_LINE('Successfully set job parameters for job '||v_job_name);                                                                                                                        
                                                                                                                                                                                                            
    -- export just these schemas                                                                                                                                                                            
         dbms_datapump.metadata_filter(handle => h1, name => 'SCHEMA_LIST', value => ' ''<SCHEMA01>'',''<SCHEMA02>'',''<SCHEMA03>'' ');                                                                                       
       DBMS_OUTPUT.PUT_LINE('Successfully set schemas for job '||v_job_name);                                                                                                                               
    -- export tables only                                                                                                                                                                                   
         dbms_datapump.metadata_filter(handle => h1, name => 'INCLUDE_PATH_EXPR', value => q'[='TABLE']' );                                                                                                 
       DBMS_OUTPUT.PUT_LINE('Successfully set table export for job '||v_job_name);                                                                                                                          
    -- export just these tables                                                                                                                                                                            
         dbms_datapump.metadata_filter(handle => h1, name => 'NAME_LIST', value => ' ''<TABLE01>'',''<TABLE02>'',''<TABLE03>'',''<TABLE03>'',''<TABLE04>'' ', object_path => 'TABLE');                                                                                                                                                                                                     
       DBMS_OUTPUT.PUT_LINE('Successfully set tables for job '||v_job_name);                                                                                                                                
    -- export just a 1/v_dop part of the data                                                                                                                                                             
         dbms_datapump.data_filter(handle => h1, name => 'SUBQUERY', value => 'WHERE MOD(DBMS_ROWID.ROWID_BLOCK_NUMBER(ROWID), '||v_dop||')='||counter);                                                    
       DBMS_OUTPUT.PUT_LINE('Successfully set data filter for job '||v_job_name);                                                                                                                          
                                                                                                                                                                                                            
         dbms_datapump.add_file                                                                                                                                                                             
           ( handle => h1                                                                                                                                                                                   
           , filename => v_dmp_file_name                                                                                                                                                                    
           , directory => g_dmp_dir                                                                                                                                                               
           , filetype => DBMS_DATAPUMP.KU$_FILE_TYPE_DUMP_FILE                                                                                                                                              
           , reusefile => 1 );                                                                                                                                                                              
       DBMS_OUTPUT.PUT_LINE('Successfully add dmp file: '||v_dmp_file_name);                                                                                                                               
                                                                                                                                                                                                            
         dbms_datapump.add_file                                                                                                                                                                             
           ( handle => h1                                                                                                                                                                                   
           , filename => v_log_file_name                                                                                                                                                                    
           , directory => g_dmp_dir                                                                                                                                                               
           , filetype => DBMS_DATAPUMP.KU$_FILE_TYPE_LOG_FILE);                                                                                                                                             
       DBMS_OUTPUT.PUT_LINE('Successfully add log file: '||v_log_file_name );                                                                                                                              
                                                                                                                                                                                                            
         dbms_datapump.log_entry(handle => h1, message => 'Job '||(counter+1)||'/'||v_dop||' starting at '||to_char(sysdate, 'dd-mon-yyyy hh24:mi:ss')||' as of scn: '||v_curr_scn );                       
         dbms_datapump.start_job(handle => h1, skip_current => 0, abort_step => 0);                                                                                                                         
       DBMS_OUTPUT.PUT_LINE('Successfully started job '||(counter+1)||'/'||v_dop||' at '||to_char(sysdate,'dd-mon-yyyy hh24:mi:ss') ||' as of scn: '||v_curr_scn );                                        
                                                                                                                                                                                                            
         dbms_datapump.detach(handle => h1);                                                                                                                                                                
       DBMS_OUTPUT.PUT_LINE('Successfully detached from job' );                                                                                                                                            
                                                                                                                                                                                                            
      end loop;                                                                                                                                                                                             
      DBMS_OUTPUT.PUT_LINE('**** END SETTING DATAPUMP PARALLEL_TABLE_EXPORT JOBS ****' );                                                                                                                   
                                                                                                                                                                                                            
    EXCEPTION                                                                                                                                                                                               
      WHEN OTHERS THEN                                                                                                                                                                                      
        dbms_datapump.detach(handle => h1);                                                                                                                                                                 
        DBMS_OUTPUT.PUT_LINE('Successfully detached from job' );                                                                                                                                            
        DBMS_OUTPUT.PUT_LINE('Error: '||SQLERRM||' - '||DBMS_UTILITY.FORMAT_ERROR_BACKTRACE );                                                                                                              
        DBMS_OUTPUT.PUT_LINE('**** END SETTING DATAPUMP PARALLEL_TABLE_EXPORT JOBS ****' );                                                                                                                 
        RAISE_APPLICATION_ERROR( -20010, SQLERRM||' - '||DBMS_UTILITY.FORMAT_ERROR_BACKTRACE );                                                                                                             
    END Exp_Tables_Parallel;                                                                                                                                                                        
                                                                                                       
    -------------------------------------------------------------------------------------------------                                                                                                       
    PROCEDURE Exec_Export
       ( P_wait IN PLS_INTEGER := 0 )                                                                                                                                                                   
      IS                                                                                                                                                                                                    
      v_scn         NUMBER;                                                                                                                                                                                     
      v_dmp         VARCHAR2(200);
      export_done   PLS_INTEGER := 0;                                                                                                                                                                              
                                                                                                                                                                                                            
    BEGIN                                                                                                                                                                                                   
                                                                                                                                                                                                            
    -- get current scn                                                                                                                                                                                      
      v_scn := Get_Current_Scn;                                                                                                                                                                             
                                                                                                                                                                                                            
    -- start parallel export processes + detach                                                                                                                                                             
      Exp_Tables_Parallel( v_scn, v_dmp );                                                                                                                                                       
    
      if P_wait = 1 then
    -- wait till all parallel export processes are finished 
    -- check every 5 minutes                                                                                                                                                                                    
        export_done := 0;
        loop                                                                                                           
          for i in ( select 1                                                                                                                                                                               
                     from ( select count(*) cnt                                                                                                                                                             
                            from user_tables                                                                                                                                                                
                            where instr(table_name,upper(replace(v_dmp,'.dmp'))) > 0 )                                                                                                                   
                     where cnt = 0 )                                                                                                                                                                        
          loop                                                                                                                                                                                              
            export_done := 1;                                                                                                                                                      
          end loop;
        
          if export_done = 1 then
            exit;
          end if;
          Sleep(300000);
        end loop; 
      end if;
                                                                                                                                                                                                            
    EXCEPTION                                                                                                                                                                                               
      WHEN OTHERS THEN                                                                                                                                                                                      
        DBMS_OUTPUT.PUT_LINE('Error: '||SQLERRM||' - '||DBMS_UTILITY.FORMAT_ERROR_BACKTRACE );                                                                                                              
        RAISE_APPLICATION_ERROR( -20010, SQLERRM||' - '||DBMS_UTILITY.FORMAT_ERROR_BACKTRACE );                                                                                                             
    END Exec_Export;                                                                                                                                                                                
                                                                                                                                                                                                            
    --------------------------------------------------------------------------------------------------------                                                                                                
    END Datapump_Parallel_Exp_Pck;
    /
    
    SHOW ERRORS;
    

    The post DIY Parallelization with Oracle DBMS_DATAPUMP appeared first on AMIS Oracle and Java Blog.

    AMIS Tools Showroom – The Sequel – Donderdag 16 maart 2017

    Mon, 2017-02-20 23:25

    Donderdag 16 maart

    17.00-21.00 uur

    AMIS, Nieuwegein

    Aanmelden via: bu.om@amis.nl

    Op donderdag 16 maart vindt de tweede AMIS Tools Showroom Sessie plaats. De eerste sessie was op 13 december: hierin hebben 16 AMIS-ers in korte en hele korte presentaties en demonstraties allerlei handige tools en hulpmiddelen laten zien. De nadruk in deze sessie lag op tools voor monitoring, communicatie en collaboration.

    In deze tweede sessie gaan we op zoek naar nog een collectie tools. Deze uitnodiging betreft dan ook twee aspecten:

    · Wil je er op 16 maart bij zijn om tools door je vakbroeders gepresenteerd te krijgen?

    · Heb jij een tool waarover je tijdens deze sessie wil presenteren? Denk bijvoorbeeld aan tools rondom web conferencing & video streaming, screen cams, text editing, chat, image editing, data visualisatie, document sharing, voice recognition. En andere tools, apps en plugins die jij handig vindt in je werk en die je aan je vakgenoten zou willen laten zien – in een korte presentatie (5-15 min) – liefst met een demo.

    Zou je via het volgende formulier willen aangeven welke tools voor jou interessant zijn en over welk tool jij wel zou willen presenteren: https://docs.google.com/forms/d/e/1FAIpQLSdNPwUACXxWaZGfs911UraVFQp5aWqeJVEx0xrSRFQTcYnYXA/viewform .
    Op basis van de resultaten van deze survey kunnen we de agenda samenstellen voor deze sessie.

    The post AMIS Tools Showroom – The Sequel – Donderdag 16 maart 2017 appeared first on AMIS Oracle and Java Blog.

    Node.js application writing to MongoDB – Kafka Streams findings read from Kafka Topic written to MongoDB from Node

    Mon, 2017-02-20 08:35

    MongoDB is a popular, light weight, highly scalable, very fast and easy to use NoSQL document database. Written in C++, working with JSON documents (stored in binary format BSON), processing JavaScript commands using the V8 engine, MongoDB easily ties in into many different languages and platforms, one of which is Node.JS. In this article, I describe first of all how a very simple interaction between Node.JS and MongoDB can be implemented.

     

    image

    Then I do something a little more challenging: the Node.JS application consumes messages from an Apache Kafka topic and writes these messages to a MongoDB database collection, to make the results available for many clients to read and query. Finally I will show a little analytical query against the MongoDB collection, to retrieve some information we would not have been able to get from the plain Kafka Topic (although with Kafka Streams it just may be possible as well).

    You will see the Mongo DB driver for Node.JS in action, as well as the kafka-node driver for Apache Kafka from Node.JS. All resources are in the GitHub Repo: https://github.com/lucasjellema/kafka-streams-running-topN.

    Prerequisites

    Node.JS is installed, as is MongoDB.

    Run the MongoDB server. On Windows, the command is mongod, optionally followed by the dbpath parameter to specify in which directory the data files are to be stored

    mongod --dbpath c:\node\nodetest1\data\

    For the part where messages are consumed from a Kafka Topic, a running Apache Kafka Cluster is  available – as described in more detail in several previous articles such as https://technology.amis.nl/2017/02/13/kafka-streams-and-nodejs-consuming-and-periodically-reporting-in-node-js-on-the-results-from-a-kafka-streams-streaming-analytics-application/.

     

    Getting Started

    Start a new Node application, using npm init.

    Into this application, install npm packages kafka-node en mongodb:

    npm install mongodb –save

    npm install kafka-node –save

    This installs the two Node modules with their dependencies and adds them to the package.json

     

    First Node Program – for Creating and Updating Two Static Documents

    This simple Node.JS program uses the the mongodb driver for Node, connects to a MongoDB server running locally and a database called test. It then tries to update two documents in the top3 collection in the test database; if a document does not yet exist (based on the key which is the continent property) it is created. When the application is done running, two documents exist (and have their lastModified property set if they were updated).

    var MongoClient = require('mongodb').MongoClient;
    var assert = require('assert');
    
    // connect string for mongodb server running locally, connecting to a database called test
    var url = 'mongodb://127.0.0.1:27017/test';
    
    MongoClient.connect(url, function(err, db) {
      assert.equal(null, err);
      console.log("Connected correctly to server.");
       var doc = {
            "continent" : "Europe",
             "nrs" : [ {"name":"Belgium"}, {"name":"Luxemburg"}]
          };
       var doc2 = {
            "continent" : "Asia",
             "nrs" : [ {"name":"China"}, {"name":"India"}]
          };
      insertDocument(db,doc, function() {
        console.log("returned from processing doc "+doc.continent);  
        insertDocument(db,doc2, function() {
          console.log("returned from processing doc "+doc2.continent);          
          db.close();
          console.log("Connection to database is closed. Two documents should exist, either just created or updated. ");
          console.log("From the MongoDB shell: db.top3.find() should list the documents. ");
        });
      });
    });
    
    var insertDocument = function(db, doc, callback) {
       // first try to update; if a document could be updated, we're done 
       console.log("Processing doc for "+doc.continent);
       updateTop3ForContinent( db, doc, function (results) {      
           if (!results || results.result.n == 0) {
              // the document was not updated so presumably it does not exist; let's insert it  
              db.collection('top3').insertOne( 
                    doc
                  , function(err, result) {
                       assert.equal(err, null);
                       callback();
                    }
                  );   
           }//if
           else {
             callback();
           }
     }); //updateTop3ForContinent
    }; //insertDocument
    
    var updateTop3ForContinent = function(db, top3 , callback) {
       db.collection('top3').updateOne(
          { "continent" : top3.continent },
          {
            $set: { "nrs": top3.nrs },
            $currentDate: { "lastModified": true }
          }, function(err, results) {
          //console.log(results);
          callback(results);
       });
    };
    
    

    The console output from the Node application:

    image

    The output on the MongoDB Shell:

    image

    Note: I have used db.top3.find() three times: before running the Node application, after it has ran once and after it has ran a second time. Note that after the second time, the lastModified property was added.

    Second Node Program – Consume messages from Kafka Topic and Update MongoDB accordingly

    This application registers as Kafka Consumer on the Topic Top3CountrySizePerContinent. Each message that is produced to that topic is consumed by the Node application and handled by function handleCountryMessage. This function parses the JSON message received from Kafka, adds a property continent derived from the key of the Kafka message, and calls the insertDocument function. This function attempts to update a record in the MongoDB collection top3 that has the same continent property value as the document passed in as parameter. If the update succeeds, the handling of the Kafka message is complete and the MongoDB collection  contains the most recent standings produced by the Kafka Streams application. If the update fails, presumably that happens because there is no record yet for the current continent. In that case, a new document is inserted for the continent.

    image

    /*
    This program connects to MongoDB (using the mongodb module )
    This program consumes Kafka messages from topic Top3CountrySizePerContinent to which the Running Top3 (size of countries by continent) is produced.
    
    This program records each latest update of the top 3 largest countries for a continent in MongoDB. If a document does not yet exist for a continent (based on the key which is the continent property) it is inserted.
    
    The program ensures that the MongoDB /test/top3 collection contains the latest Top 3 for each continent at any point in time.
    
    */
    
    var MongoClient = require('mongodb').MongoClient;
    var assert = require('assert');
    
    var kafka = require('kafka-node')
    var Consumer = kafka.Consumer
    var client = new kafka.Client("ubuntu:2181/")
    var countriesTopic = "Top3CountrySizePerContinent";
    
    
    // connect string for mongodb server running locally, connecting to a database called test
    var url = 'mongodb://127.0.0.1:27017/test';
    var mongodb;
    
    MongoClient.connect(url, function(err, db) {
      assert.equal(null, err);
      console.log("Connected correctly to MongoDB server.");
      mongodb = db;
    });
    
    var insertDocument = function(db, doc, callback) {
       // first try to update; if a document could be updated, we're done 
       updateTop3ForContinent( db, doc, function (results) {      
           if (!results || results.result.n == 0) {
              // the document was not updated so presumably it does not exist; let's insert it  
              db.collection('top3').insertOne( 
                    doc
                  , function(err, result) {
                       assert.equal(err, null);
                       console.log("Inserted doc for "+doc.continent);
                       callback();
                    }
                  );   
           }//if
           else {
             console.log("Updated doc for "+doc.continent);
             callback();
           }
     }); //updateTop3ForContinent
    }; //insertDocument
    
    var updateTop3ForContinent = function(db, top3 , callback) {
       db.collection('top3').updateOne(
          { "continent" : top3.continent },
          {
            $set: { "nrs": top3.nrs },
            $currentDate: { "lastModified": true }
          }, function(err, results) {
          //console.log(results);
          callback(results);
       });
    };
    
    // Configure Kafka Consumer for Kafka Top3 Topic and handle Kafka message (by calling updateSseClients)
    var consumer = new Consumer(
      client,
      [],
      {fromOffset: true}
    );
    
    consumer.on('message', function (message) {
      handleCountryMessage(message);
    });
    
    consumer.addTopics([
      { topic: countriesTopic, partition: 0, offset: 0}
    ], () => console.log("topic "+countriesTopic+" added to consumer for listening"));
    
    function handleCountryMessage(countryMessage) {
        var top3 = JSON.parse(countryMessage.value);
        var continent = new Buffer(countryMessage.key).toString('ascii');
        top3.continent = continent;
        // insert or update the top3 in the MongoDB server
        insertDocument(mongodb,top3, function() {
          console.log("Top3 recorded in MongoDB for "+top3.continent);  
        });
    
    }// handleCountryMessage
    
    

    Running the application produces the following output.

    Producing Countries:

    SNAGHTML44a937e

    Producing Streaming Analysis – Running Top 3 per Continent:

    SNAGHTML44b2c82

    Processing Kafka Messages:

    image

    Resulting MongoDB collection:

    SNAGHTML44bf675

    And after a little while, here is the latest situation for Europe and Asia in the MongoDB collection :

    image

    Resulting from processing the latest Kafka Stream result messages:

    image

     

     

    Querying the MongoDB Collection

    The current set of top3 documents – one for each continent – stored in MongoDB can be queried, using MongoDB find and aggregation facilities.

    One query we can perform is to retrieve the top 5 largest countries in the world. Here is the query that gives us that insight. First it creates a single record per country (using unwind to join the nrs collection in each top3 document). The countries are then sorted by the size of each country (descending) and the first 5 of the sort result are retained. These five are then projected into a nicer looking output document that only contains continent, country and area fields.

    db.top3.aggregate([ {$project: {nrs:1}},{$unwind:’$nrs’}, {$sort: {“nrs.size”:-1}}, {$limit:5}, {$project: {continent:’$nrs.continent’, country:’$nrs.name’, area:’$nrs.size’ }}])

    db.top3.aggregate([ 
       {$project: {nrs:1}}
      ,{$unwind:'$nrs'}
      , {$sort: {"nrs.size":-1}}
      , {$limit:5}
      , {$project: {continent:'$nrs.continent', country:'$nrs.name', area:'$nrs.size' }}
    ])
    

    image

    (And because no continent has its number 3 country in the top 4 of this list, we can be sure that this top 5 is the actual top 5 of the world)

     

    Resources

    A very good read – although a little out of date – is this tutorial on 1st and 2nd steps with Node and Mongodb: http://cwbuecheler.com/web/tutorials/2013/node-express-mongo/ 

    MongoDB Driver for Node.js in the official MongoDB documentation: https://docs.mongodb.com/getting-started/node/client/ 

    Kafka Connect for MongoDB – YouTube intro – https://www.youtube.com/watch?v=AF9WyW4npwY 

    Combining MongoDB and Apache Kafka – with a Java application talking and listening to both: https://www.mongodb.com/blog/post/mongodb-and-data-streaming-implementing-a-mongodb-kafka-consumer 

    Tutorials Point MongoDB tutorials – https://www.tutorialspoint.com/mongodb/mongodb_sort_record.htm 

    Data Aggregation with Node.JS driver for MongoDB – https://docs.mongodb.com/getting-started/node/aggregation/

    The post Node.js application writing to MongoDB – Kafka Streams findings read from Kafka Topic written to MongoDB from Node appeared first on AMIS Oracle and Java Blog.

    Public Cloud consequences with an Oracle environment

    Sun, 2017-02-19 10:23

    The title suggests a negative statement of using a Public Cloud. Well, it isn’t.  I’m convinced the Cloud is the next BIG thing, with huge advantages for businesses. But companies should be aware of what they choose. A lot of providers, including Oracle, are pushing us to the cloud, Public, Private or Hybrid. And make us believe that a public cloud will be an inevitable extension to our on-premises environment. Moving your weblogic and database environment including data from on-premises to the public cloud and back is no problem, or will be no problem in the future. But what kind of hassle you have to cope with, technical and business-wise?

    The list of implications and consequences in this post is not exhaustive, it intends to give you an idea of the technical and non-technical consequences of moving to and from the cloud.

    Public Cloud

    The title of this blogpost is about the ‘Public Cloud’. What I mean here is the Oracle-authorized Public clouds: Amazon, Azure and of course the Oracle Cloud. More drilled down: the PaaS and the IaaS environments. What were the big differences again and why am I not talking about consequences with a Private Cloud.

    I like to think that a Private Cloud is a better version of the average on-premises environments with at least some of these characteristics of a Public Cloud:

    – Self-serviced

    – Shared services

    – Standardized

    – Virtual

    – Metering, automatic cost allocation and chargeback

    So on-premises may be a Private Cloud, but most of the time it’s not the same. One – not mentioned yet – characteristic is very important: you (your IT-department) should be in control of the environment regarding versions, patches, Service Level Agreements etc.

     

    A Public Cloud has at least the following characteristics next to the one mentioned above:

    – Shared resources with other companies

    – Available through the internet

    And: no department of your company is in full control of versions, patches, Service Level Agreements etc.

     

    So the scope of the article will be the Public Cloud

    Version differences

    When deploying Oracle software in the Public Cloud, you are depending on the versions and patches offered by the cloud provider.E.g. Oracle announced it will deploy the most recent patches and releases at first in the Oracle Public Cloud. And afterwards these releases and patches will be available for the on-premises environment.

    So when having a development- and test- environment in the Public Cloud, and the production at on-premises or private cloud, you must be fully aware of the version differences regarding your life cycle management.

    image

    License differences

    When deploying your software in an IaaS environment it’s possible to ‘Bring Your Own License’. But in the Oracle Cloud it’s possible being charged ‘metered’ and ‘unmetered’ subscription.

    Databases in the Oracle Public Cloud are fully installed with options, and it’s possible and sometimes inevitable to use them.  E.g. the extra tablespace you create in the Oracle cloud is default encrypted. So when moving this database  to on-premises, you must be owner the Security Option license of the Oracle Database Enterprise Edition to be compliant.

    And moving a Pluggable Database from the (Oracle) cloud to on-premises, to a Container-Database with already one Pluggable Database in it, you are running into a Multitenancy Option.

    The next picture is a test of creating two tablespaces in the Oracle Cloud, one default, and one with TDE. The result is two tablespaces with TDE.

    image

    Centralized administration

    It’s an ideal world when you are able to manage your database in the Private and Public Cloud from one central mangement point. When using the Oracle management solution this should be the Oracle Enterprise Manager Cloud Control. It’s relatively simple to deploy agents on-primises ánd in the Oracle Cloud (PaaS and IaaS). But unfortunately it’s not supported to deploy databases or middleware in Azure or Amazon RWS with Oracle Enterprise Manager. You will get the following message:

     

    image

    It’s technically possible to overcome this, but not supported at the moment. So with Oracle Enterprise Manager you basically have two options: Oracle Public Cloud or the Cloud@Customer option.

    Standards

    Deployment of Oracle PaaS software is automated in the cloud according to the standards dictated by the provider. It should be very convenient that these standards are the same as the on-premises software, but that’s not always the case . At least, not in a PaaS environment in de Public Cloud.

    In an IaaS environemt you’ve got generally almost full control over the Deployments. Almost? Yeah, you will still be relying at the version and configuration of the Operationg System. To also overcome this, you have to choose for a bare metal option in the Public Cloud.

     

    Azure / Amazon

    However the Clouds of Microsoft Azure and Amazon are for some a way ahead of the Oracle Cloud, the fact is that for Oracle the Oracle Cloud prevails.

    As already said, it’s not possible to manage the Oracle software in the Azure / Amazon public cloud by Oracle Enterprise Manager on-premise. Amazon does support an Agent for Enterprise Manager on-premise, but not the one you actually want to make your life easier.

    You’re depending on the breadcrumps Oracle is willing to share with the other Cloud providers.

     

    SLA

    The business would like to have a Service Level agreement with the IT-department, but the IT-department could be relying on the SLA of the Public Cloud provider. And that could be a mismatch. For example, in my tests I had a trial agreement with Microsoft Azure a while ago. All working fine, but suddenly Azure had a dispute with Oracle (I think) and I got the following message.

    image

    You don’t want this in a real environment.

    Security

    There is and there has been a distrust against security in the Public Cloud. I believe the security in the mentioned Public Clouds is generally better or at least the same as  what the average company wishes.

    Nevertheless you may have to cope with different security-bases and changes of security-roles in your company in a so-called ‘hybrid infrastructure’.

     

    Roles – Cloud Administrator

    As already mentioned, there will be new roles (and more likely new jobs) to manage the Oracle software in the Public Cloud. Managing subscriptions, interoperability, compliancy etc. New processes, new management, new challenges.

     

    Data Sovereignty

    For a lot of companies it’s important to know where the data geographically actually resides.This data is subject to the laws of the country in which it is located. What is the roadmap of these Public Cloud providers?

     

    Latency

    Seperating the database and middleware will cause latency when one of them is in the Public Cloud. Oracle has two solutions for that:

    Cloud@customer. Bring the Public Cloud to on-premises with a seperate machine, managed by Oracle.

    – Connect the Internet backbone to your company. Equinix now provides access through a direct connect or Equinix Cloud Exchange to Oracle Cloud in five (5) metros in the US, Europe, and Australia. Enterprise customers with a presence in these Equinix IBX data centers can leverage Oracle’s FastConnect giving them a high-performance private access to Oracle Cloud.

     

     

    Resources:

    Private vs hybrid: http://www.stratoscale.com/blog/cloud/private-cloud-vs-public-hybrid-or-multi/?utm_source=twitter&utm_medium=social&utm_campaign=blog_recycle

    Time to embrace the cloud: https://software.dell.com/community/b/en/posts/dbas-embrace-the-cloud

    The post Public Cloud consequences with an Oracle environment appeared first on AMIS Oracle and Java Blog.

    Node.js application using SSE (Server Sent Events) to push updates (read from Kafka Topic) to simple HTML client application

    Sun, 2017-02-19 06:33

    This article describes a simple Node.js application that uses Server Sent Events (SSE) technology to push updates to a simple HTML client, served through the Express framework. The updates originate from messages consumed from a Kafka Topic. Although the approach outlined in this article stands on its own, and does not even depend on Apache Kafka, it also forms the next step in a series of articles that describe an Kafka Streams application that processes messages on a Kafka Topic – deriving running Top N ranks – and produces them to another Kafka Topic. The Node.js application in this current article consumes the Top N messages and pushes them to the HTML client.

    The simple story told by this article is:

    image

    And the complete picture – including the prequel discussed in https://technology.amis.nl/2017/02/12/apache-kafka-streams-running-top-n-grouped-by-dimension-from-and-to-kafka-topic/ – looks like this:

    image

     

    Sources are found on GitHub:https://github.com/lucasjellema/kafka-streams-running-topN/tree/master/kafka-node-express-topN-sse .

    Topics discussed in this article

    Browser, HTML & JavaScript

    • Programmatically add HTML elements
    • Add row to an HTML table and cells to a table row
    • Set Id attribute on HTML elements
    • Loop over all elements in an array using for .. of
    • Subscribe to a SSE server
    • Process an incoming SSE message (onMessage, process JSON)
    • Formatting (large) numeric values in JavaScript strings
    • Concatenating Strings in JavaScript

    Node & Express (server side JavaScript)

    • Consume message from Kafka Topic
    • Serve static HTML documents using Express
    • Expose API through Express that allows SSE clients to register for server sent events
    • Push JSON messages to all SSE clients
    • Execute a function periodically, based on an interval using a Node Time (setInterval)

    Browser – Client Side – HTML & JavaScript

    The client side of the implementation is a simple HTML document (index.html) with embedded JavaScript. In a real application, the JavaScript should ideally be imported from a separate JavaScript library. In the <script> tag in the <head> of the document is the JavaScript statement that registers the browser as a SSE subscriber:

    var source = new EventSource(“../topn/updates”);

    The SSE server is located at a path /topn/updates relative to the path where the index.html document was loaded (http://host:port/index.html – downloaded from the public sub directory in the Node application where static resources are located and served from). Requests to this URL path are handled through the Express framework in the Node application.

    On this EventSource object, a message handler is created – with the function to be invoked whenever an SSE event is received on this source:

    source.onmessage = function(event) { … }

    The content of the function is fairly straightforward: the JSON payload from the event is parsed. It contains the name of a continent and an array of the current top 3 countries by size in that continent. Based on this information, the function locates the continent row (if it does not yet exist, the row is created) in the table with top3 records. The top3 in the SSE event is written to the innnerHTML property of the second table cell in the continent’s table row.

     

    
    <!DOCTYPE html>
    <html>
      <head>
        <title>Continent and Country Overview</title>
        <meta charset="UTF-8">
        <meta name="viewport" content="width=device-width, initial-scale=1.0">
        <script>
    	  // assume that API service is published on same server that is the server of this HTML file
        // send event to SSE stream 
        /* "{\"nrs\":[{\"code\":\"FR\",\"name\":\"France\",\"population\":66836154,\"size\":643801,\"continent\":\"Europe\"},{\"code\":\"DE\",\"name\":\"Germany\",\"population\":80722792,\"size\":357022,\"continent\":\"Europe\"},{\"code\":\"FI\",\"name\":\"Finland\",\"population\":5498211,\"size\":338145,\"continent\":\"Europe\"},null]}"
    update all Sse Client with message {"nrs":[{"code":"FR","name":"France","population":66836154,"size":643801,"continent":"Europe"},{"code":"DE","name":"Germany","population":80722792,"size":357022,"continent":"Europe"},{"code":"FI","name":"Finland","population":5498211,"size":338145,"continent":"Europe"},null]}
    */
          var source = new EventSource("../topn/updates");
          source.onmessage = function(event) {
            var top3 = JSON.parse(event.data);
            if (top3.continent) {
            var nrs = top3.nrs;
             var trID = "continent_"+top3.continent;
             // find row in table with id equal to continent
             var tr = document.getElementById(trID);
             // if not found, then add a row
             if (!tr) {
               // table does not yet have a row for the continent, than add it 
               // find table with continents
               var tbl = document.getElementById("continentsTbl");
               // Create an empty <tr> element and add it to the 1st position of the table:
               tr = tbl.insertRow(1);
               tr.setAttribute("id", trID, 0);
               // Insert new cells (<td> elements) at the 1st and 2nd position of the "new" <tr> element:
               var cell1 = tr.insertCell(0);
               cell1.setAttribute("id",trID+"Continent",0);
               var cell2 = tr.insertCell(1);
               cell2.setAttribute("id",trID+"Top3",0);
               // Add some text to the new cells:
               cell1.innerHTML = top3.continent;
             }// tr not found
             var top3Cell = document.getElementById(trID+"Top3");
             var list = "<ol>";
             for (country of top3.nrs) {
                if (country) {
                    list= list.concat( "<li>",country.name," (size ",country.size.toLocaleString(),")","</li>");
                }
             }//for
             list= list+ "</ol>";
             top3Cell.innerHTML = list;    
            }// if continent    
          };//onMessage
        </script>    
      </head>
      <body>
        <div id="loading">
          <h2>Please wait...</h2>
        </div>
        <div id="result">
          <table id="continentsTbl">
            <tr><td>Continent</td><td>Top 3 Countries by Size</td></tr>
          </table>
        </div>
      </body>
    </html>
    

     

    Node Application – Server Side – JavaScript using Express framework

    The server side in this article consists of a simple Node application that leverages the Express module as well as the kafka-node module. A simple, generic SSE library is used – in the file sse.js. It exports the Connection object – that represents the SSE channel to a single client – and the Topic object that manages a collection of Connections (for all SSE consumers around a specific subject). When the connection  under a Connection ends (on close), the Connection is removed from the Collection.

    "use strict";
    
    console.log("loading sse.js");
    
    // ... with this middleware:
    function sseMiddleware(req, res, next) {
        console.log(" sseMiddleware is activated with "+ req+" res: "+res);
        res.sseConnection = new Connection(res);
        console.log(" res has now connection  res: "+res.sseConnection );
        next();
    }
    exports.sseMiddleware = sseMiddleware;
    /**
     * A Connection is a simple SSE manager for 1 client.
     */
    var Connection = (function () {
        function Connection(res) {
              console.log(" sseMiddleware construct connection for response ");
      
            this.res = res;
        }
        Connection.prototype.setup = function () {
            console.log("set up SSE stream for response");
            this.res.writeHead(200, {
                'Content-Type': 'text/event-stream',
                'Cache-Control': 'no-cache',
                'Connection': 'keep-alive'
            });
        };
        Connection.prototype.send = function (data) {
            console.log("send event to SSE stream "+JSON.stringify(data));
            this.res.write("data: " + JSON.stringify(data) + "\n\n");
        };
        return Connection;
    }());
    
    exports.Connection = Connection;
    /**
     * A Topic handles a bundle of connections with cleanup after lost connection.
     */
    var Topic = (function () {
        function Topic() {
              console.log(" constructor for Topic");
      
            this.connections = [];
        }
        Topic.prototype.add = function (conn) {
            var connections = this.connections;
            connections.push(conn);
            console.log('New client connected, now: ', connections.length);
            conn.res.on('close', function () {
                var i = connections.indexOf(conn);
                if (i >= 0) {
                    connections.splice(i, 1);
                }
                console.log('Client disconnected, now: ', connections.length);
            });
        };
        Topic.prototype.forEach = function (cb) {
            this.connections.forEach(cb);
        };
        return Topic;
    }());
    exports.Topic = Topic;
    
    

    The main application – in file topNreport.js does a few things:

    • it serves static HTML resources in the public subdirectory (which only contains the index.html document)
    • it implements the /topn/updates API where clients can register for SSE updates (that are collected in the sseClients Topic)
    • it consumes messages from the Kafka Topic Top3CountrySizePerContinent and pushes each received message as SSE event to all SSE clients
    • it schedules a function for periodic execution (once every 10 seconds at the moment); whenever the function executes, it sends a heartbeat event to all SSE clients

     

    /*
    This program serves a static HTML file (through the Express framework on top of Node). The browser that loads this HTML document registers itself as an SSE client with this program.
    
    This program consumes Kafka messages from topic Top3CountrySizePerContinent to which the Running Top3 (size of countries by continent) is produced.
    
    This program reports to all its SSE clients the latest update (or potentially a periodice top 3 largest countries per continent (with a configurable interval))
    
     
    */
    
    var express = require('express')
      , http = require('http')
      , sseMW = require('./sse');
    
    var kafka = require('kafka-node')
    var Consumer = kafka.Consumer
    var client = new kafka.Client("ubuntu:2181/")
    var countriesTopic = "Top3CountrySizePerContinent";
    
    var app = express();
    var server = http.createServer(app);
    
    var PORT = process.env.PORT || 3000;
    var APP_VERSION = '0.9';
    
    server.listen(PORT, function () {
      console.log('Server running, version '+APP_VERSION+', Express is listening... at '+PORT+" ");
    });
    
     // Realtime updates
    var sseClients = new sseMW.Topic();
    
    
    app.use(express.static(__dirname + '/public'))
    app.get('/about', function (req, res) {
        res.writeHead(200, {'Content-Type': 'text/html'});
        res.write("Version "+APP_VERSION+". No Data Requested, so none is returned");
        res.write("Supported URLs:");
        res.write("/public , /public/index.html ");
        res.write("incoming headers" + JSON.stringify(req.headers)); 
        res.end();
    });
    //configure sseMW.sseMiddleware as function to get a stab at incoming requests, in this case by adding a Connection property to the request
    app.use(sseMW.sseMiddleware)
    
    // initial registration of SSE Client Connection 
    app.get('/topn/updates', function(req,res){
        var sseConnection = res.sseConnection;
        sseConnection.setup();
        sseClients.add(sseConnection);
    } );
    
    
    var m;
    //send message to all registered SSE clients
    updateSseClients = function(message) {
        var msg = message;
        this.m=message;
        sseClients.forEach( 
          function(sseConnection) {
            sseConnection.send(this.m); 
          }
          , this // this second argument to forEach is the thisArg (https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/forEach) 
        ); //forEach
    }// updateSseClients
    
    // send a heartbeat signal to all SSE clients, once every interval seconds (or every 3 seconds if no interval is specified)
    initHeartbeat = function(interval) {
        setInterval(function()  {
            var msg = {"label":"The latest", "time":new Date()}; 
            updateSseClients( JSON.stringify(msg));
          }//interval function
        , interval?interval*1000:3000
        ); // setInterval 
    }//initHeartbeat
    
    // initialize heartbeat at 10 second interval
    initHeartbeat(10); 
    
    
    // Configure Kafka Consumer for Kafka Top3 Topic and handle Kafka message (by calling updateSseClients)
    var consumer = new Consumer(
      client,
      [],
      {fromOffset: true}
    );
    
    consumer.on('message', function (message) {
      handleCountryMessage(message);
    });
    
    consumer.addTopics([
      { topic: countriesTopic, partition: 0, offset: 0}
    ], () => console.log("topic "+countriesTopic+" added to consumer for listening"));
    
    function handleCountryMessage(countryMessage) {
        var top3 = JSON.parse(countryMessage.value);
        var continent = new Buffer(countryMessage.key).toString('ascii');
        top3.continent = continent;
        updateSseClients( top3);
    }// handleCountryMessage
    

    Running the application

    In order to run the application, the Node application that publishes the basic country records to a Kafka Topic is started:

    SNAGHTML2a52e0

    The Kafka Streaming Java application that derives the Top 3 per continent as produces it to a Kafka Topic is started:

    image

    And the Node application that consumes from the Top3 Topic and pushes SSE events to the browser clients is run:

    image

    After a little wait, the browser displays:

    image

    based on output from the Kafka Streams application:

    image

    When all messages have been processed from the countries2.csv input file, this is what the browser shows:

    image

    This is the result of all the individual top3 messages pushed as SSE events from the Node application to the browser client. The screenshot shows only one browser client; however, many browsers could have connected to the same Node server and have received the same SSE events simultaneously.

    image

    The post Node.js application using SSE (Server Sent Events) to push updates (read from Kafka Topic) to simple HTML client application appeared first on AMIS Oracle and Java Blog.

    Kafka Streams and NodeJS – Consuming and periodically reporting in Node.JS on the results from a Kafka Streams streaming analytics application

    Mon, 2017-02-13 09:22

    In several previous articles on Apache Kafka, Kafka Streams and Node.JS for interacting with Apache Kafka, I have described how to create a Node.JS application that publishes messages to a Kafka Topic (based on entries in a CSV file), how to create a simple Kafka Streams Java application that processes such messages from that Topic and how to extend that Java application to produce a running Top-N aggregation from that Topic. In this article, I want to discuss a Node application that consumes the Top-N reports from the Kafka Topic produced to by the Kafka Streams application and periodically (once every X seconds) reports on the current standings.

    image

    The sources for this article are in this GitHub Repo: https://github.com/lucasjellema/kafka-streams-running-topN.

    The Node application uses the npm module kafka-node (https://www.npmjs.com/package/kafka-node) for the interaction with Kafka.

    A new Client is created – based on the ZooKeeper connect string (ubuntu:2181/). Using the Client, a Consumer is constructed. The consumer is configured to consume from Topic Top3CountrySizePerContinent. A message handler is associated with the consumer, to handle messages on the topic.

    The messages consumed by the Node consumer have the following structure:

    {"topic":"Top3CountrySizePerContinent"
    ,"value":"{\"nrs\":[{\"code\":\"DZ\",\"name\":\"Algeria\",\"population\":40263711,\"size\":2381741,\"continent\":\"Africa\"},{\"code\":\"CD\",\"name\":\"Democratic Republic of the Congo\",\"population\":81331050,\"size\":2344858,\"continent\":\"Africa\"},{\"code\":\"SD\",\"name\":\"Sudan\",\"population\":36729501,\"size\":1861484,\"continent\":\"Africa\"},null]}"
    ,"offset":244
    ,"partition":0
    ,"key":{"type":"Buffer","data":[65,102,114,105,99,97]}
    }
    

    The key of the message is of type buffer. We happen to know the key is actually a String (the name of the continent). We can extract the key like this:

    var continent = new Buffer(countryMessage.key).toString(‘ascii’);

    The payload of the message – the top3 for the continent – is in the value property. It can be extracted easily:

    var top3 = JSON.parse(countryMessage.value);

    {"nrs":
      [
       {"code":"BS","name":"Bahamas","population":327316,"size":13880,"continent":"North America"}
      ,{"code":"AG","name":"Antigua and Barbuda","population":93581,"size":443,"continent":"North America"}
      ,{"code":"AW","name":"Aruba","population":113648,"size":180,"continent":"North America"}
      ,null
      ]
    }
    

    The object countrySizeStandings contains a property for each continent. The property is set equal to the top3 that was most recently consumed from the Kafka Topic Top3CountrySizePerContinent.

    countrySizeStandings[continent]=top3;

    Using the Node built in setInterval() the report() function is scheduled for execution every reportingIntervalInSecs seconds. This function writes the current data in countrySizeStandings to the console.

     

    /*
    
    This program consumes Kafka messages from topic Top3CountrySizePerContinent to which the Running Top3 (size of countries by continent) is produced.
    
    This program reports: top 3 largest countries per continent (periodically, with a configurable interval) 
    */
    
    
    var kafka = require('kafka-node')
    var Consumer = kafka.Consumer
    var client = new kafka.Client("ubuntu:2181/")
    
    var countriesTopic = "Top3CountrySizePerContinent";
    var reportingIntervalInSecs = 4;
    
    var consumer = new Consumer(
      client,
      [],
      {fromOffset: true}
    );
    
    consumer.on('message', function (message) {
      handleCountryMessage(message);
    });
    
    consumer.addTopics([
      { topic: countriesTopic, partition: 0, offset: 0}
    ], () => console.log("topic "+countriesTopic+" added to consumer for listening"));
    
    var countrySizeStandings = {}; // the global container for the most recent country size standings 
    
    function handleCountryMessage(countryMessage) {
        var top3 = JSON.parse(countryMessage.value);
        // extract key value from the Kafka message
        var continent = new Buffer(countryMessage.key).toString('ascii');
        // record the top3 for the continent indicated by the message key as current standing in the countrySizeStandings object
        countrySizeStandings[continent]=top3;
    }// handleCountryMessage
    
    // every reportingIntervalInSecs seconds, report on the current standings per continent
    function report() {
       var d = new Date();
       console.log("Report at "+ d.getHours()+":"+d.getMinutes()+ ":"+d.getSeconds());
       // loop over the keys (properties) in the countrySizeStandings map (object)
       for (var continent in countrySizeStandings) {
         if (countrySizeStandings.hasOwnProperty(continent)) {
            var line = continent+ ": ";
            var index = 1;
            countrySizeStandings[continent].nrs.forEach(function(c) {
              if (c) {
                line = line + (index++) +'. '+ c.name+ '('+c.size+'), ';
              }
            });
            console.log(line);
        }//if
      }//for
    }//report
    
    // schedule execution of function report at the indicated interval
    setInterval(report, reportingIntervalInSecs*1000);

     

    Running the end to end chain requires a running Kafka Cluster and the running of the Node application to produce the country messages from the CSV file, the Kafka Streams Java application to derive the running Top 3 standings and finally the Node application introduced in this article to consume the Top 3 standings and report them to the console (as instructed in the ReadMe in the GitHub Repo):

    • node KafkaCountryProducer.js
    • java -cp target/Kafka-Streams-Country-TopN-1.0-SNAPSHOT.jar;target/dependency/* nl.amis.streams.countries.App
    • node KafkaCountryStreamsConsumer.js

    The CountryProducer.js Node application writes the messages it produced to Kafka to the console as well:

    SNAGHTML1c22841

    The Kafka-Streams-Country-TopN Java application also writes its streaming analytic findings to the console:

    SNAGHTML1c325c1

    The outcome of the Kafka Streams analysis – as published to the Kafka Topic – is consumed by the Node application, continuously, and reported to the console, periodically (once every 30 seconds), updated with the latest findings:

    image

    The post Kafka Streams and NodeJS – Consuming and periodically reporting in Node.JS on the results from a Kafka Streams streaming analytics application appeared first on AMIS Oracle and Java Blog.

    Connecting Oracle Management Cloud with Oracle Enterprise Manager 13c

    Sun, 2017-02-12 10:00

    Let’s clear about this: Oracle Management Cloud (OMC) is NOT a replacement of Oracle Enterprise Manager Cloud Control (OEM CC) or even an equivalant. Rumours are that this will  be Oracle’s policy in a far away future, but in the meantime we focus on what they do best. OEM CC is a product for a complete management solution for your Oracle environment, OMC for monitoring and, most of all,  analyse the monitored data in your Oracle environment.

    Oracle made it possible to connect these worlds by using the data of the repository of OEM CC in OMC. And that’s what this post is about.

    In a previous blog about monitoring Infrastructure with OMC I installed an OMC-cloud agent on a server with OEM CC with the repository database on it.

    Through this OMC-cloud agent it’s possible to monitor the assets – in a nice gui – but what I’d really like to do is use the data in the OEM CC – repository for the analytical power of Oracle Management Cloud.

    My infrastructure monitoring is working since this blog by installing an OMC-cloud agent. The question is however, do I have to install an OMC-cloud agent on every node, and connect every node  to the OMC?  A part of that is true. A cloud agent is necessary on every node, but they all can be directed to 1 node where a central gateway has been installed for connection to OMC. But of course you also can install a data collector for information from the Oracle Enterprise Manager Repository.

    In the documentation of IT-analytics there’s a picture with quite a nice overview:

    image

    Another, maybe more explanatory image:

     

    image

     

    For now I’m interested in the data collector. This data collector collects different types of data from the Oracle Management Repository, including targets, target properties, metrics, performance metrics, and events.

    According to the doc I need to install the gateway ánd the datacollector. In the former post I already downloaded the master-installer for Infrastructure monitoring, and this is the same as for the gateway and data collector.

    The gateway

    For the gateway I chose to use port 1840

    Performing the installation in two steps: downloading first, and then install from staged directory.

    ./AgentInstall.sh AGENT_TYPE=gateway AGENT_REGISTRATION_KEY=’RMxMm7chywi-J-VZ7_UfxY5XUU’  STAGE_LOCATION=/gwayagent -download_only  —-> this may take a few minutes, results in a gateway.zip of approx 290 MB.

    ./AgentInstall.sh AGENT_TYPE=gateway AGENT_REGISTRATION_KEY=’RMxMm7chywi-J-VZ7_UfxY5XUU’ AGENT_BASE_DIR=/omc_gway AGENT_PORT=1840 -staged

    Generating emaas.properties …
    Extracting Agent Software …
    Installing the Agent …
    Registering the Agent …
    Downloading Certificates …
    Configuring the Agent …
    Cleanup temporary files …
    The following configuration scripts need to be executed as the root user
    #!/bin/sh
    #Root script to run
    /omc_gway/core/1.12.0/root.sh

    That was quite easy.

    The data collector.

    Same recipe, first download, then the install.

    ./AgentInstall.sh AGENT_TYPE=data_collector AGENT_REGISTRATION_KEY=’RMxMm7chywi-J-VZ7_UfxY5XUU'  STAGE_LOCATION=/gwayagent -download_only  ---> this results in a lama.zip, the same as the cloud_agent,so in fact no need to download again!

     

    ./AgentInstall.sh AGENT_TYPE=data_collector AGENT_REGISTRATION_KEY=’RMxMm7chywi-J-VZ7_UfxY5XUU’ AGENT_BASE_DIR=/omc_dc GATEWAY_HOST=ovamisux159.amis.local GATEWAY_PORT=1840 EM_AGENT_NAME=ovamisux159.amis.local:1830 HARVESTER_USERNAME=SYSMAN_EMAAS_3 OMR_USERNAME=sys OMR_HOSTNAME=ovamisux159.amis.local OMR_HOST_USER_PASSWORD=welcome1 OMR_PORT=1521 OMR_SERVICE_NAME=cloudcontrol OMR_HOST_USERNAME=oracle OMR_STAGE_DIR=/gwayagent -staged

    Enter HARVESTER_USER_PASSWORD:
    Enter OMR_USER_PASSWORD:

    Generating emaas.properties …
    Extracting Agent Software …
    Installing the Agent …
    Registering the Agent …
    Downloading Certificates …
    Configuring the Agent …
    Cleanup temporary files …
    The following configuration scripts need to be executed as the root user
    #!/bin/sh
    #Root script to run
    /omc_dc/core/1.12.0/root.sh

    Here I was a bit confused by the use of OMR_USERNAME. The documentation  of the agent_install script states that this is “The Oracle Management Repository user name “.  But this user should be a user with SYSDBA to install the Harvester schema – in this case SYSMAN_EMAAS_3.

    O.k. done.

    A recap: what is running on my server at this moment:

    OEM CC – the Oracle Enterprise Manager – Management Server

    OEM CC – the Oracle Enterprise Manager repository database 12.1

    OEM CC – the Oracle Enterprise Manager agent

    OMC – cloud agent

    OMC – gateway (agent)

    OMC – data collector (agent)

    And the data collector is showing him/herself in the Oracle Management Cloud:

    image

     

    Then it’s time to add entities to the IT analytics-datawarehouse according to the doc. This time no need for OMCLI-commands, now’s it’s time to return to the GUI of the OMC.

    Here it’s called ‘Enabling Services’ on Entities.

     

    Be aware: only the items which were added through the infrastructure monitoring (through OMCLI) can be enabled as a service!

    A small documentation bug: I seem to miss a functionality: the IT Analytics Administration link.

    According to the documentation:

    As an IT Analytics administrator, you can add database targets to the IT Analytics warehouse for AWR collection.
    To add targets, do the following:
    From the Oracle Management Cloud home page, click the Application Navigator icon in the upper-right corner of the page.
    From the Administration group, click the IT Analytics Administration link

    Found out this link is no longer called “IT Analytics Administration”, it is just “Administration”.
    From there, the link “Enable/Disable Services” is used to add entities/targets to the ITA Warehouse.

    So… this one:

    image

     

     

     

    image

     

     

     

    image

     

    image

     

    image

     

    And there it is:

     

    image

     

    But, no data is yet to be seen. Is the data collector healthy and well-configured?

    [oracle@ovamisux159 bin]$ ./omcli status agent
    Oracle Management Cloud Data Collector
    Copyright (c) 1996, 2016 Oracle Corporation.  All rights reserved.
    —————————————————————
    Version                : 1.12.0
    State Home             : /omc_dc/agent_inst
    Log Directory          : /omc_dc/agent_inst/sysman/log
    Binaries Location      : /omc_dc/core/1.12.0
    Process ID             : 27293
    Parent Process ID      : 27235
    URL                    :
    https://ovamisux159.amis.local:1830/emd/main/
    Started at             : 2016-12-30 12:26:01
    Started by user        : oracle
    Operating System       : Linux version 3.8.13-55.1.6.el7uek.x86_64 (amd64)
    Data Collector enabled : true
    Sender Status          : FUNCTIONAL
    Gateway Upload Status  : FUNCTIONAL
    Last successful upload : 2017-01-22 17:03:01
    Last attempted upload  : 2017-01-22 17:03:01
    Pending Files (MB)     : 0.01
    Pending Files          : 16
    Backoff Expiration     : (none)

    —————————————————————
    Agent is Running and Ready

    Are  there any connectivity issues?

    [oracle@ovamisux159 bin]$ ./omcli status agent connectivity -verbose
    Oracle Management Cloud Data Collector
    Copyright (c) 1996, 2016 Oracle Corporation.  All rights reserved.
    —————————————————————
    Data Collector is Running

    No significant connectivity issues found between Data Collector and Gateway.
    Check the connectivity status of the Gateway: ovamisux159.amis.local

    Ahh.. forgot one thing:

    Following the note “Enabling Host Monitoring for the Infrastructure Monitoring Cloud Service (Doc ID 2195074.1)” changed the file /omc_dc/plugins/oracle.em.sgfm.zip/1.12.0/configs/discovery.properties. Deleted the # sign at the line with ‘omc_host_linux’. :

    navigate to directory:

    cd /u01/app/oracle/omc_dc/plugins/oracle.em.sgfm.zip/1.12.0/configs

    open a file for editing in vi

    vi discovery.properties

    comment out the parameter: disable_monitoring_for_entitytype=omc_host_linux

    save the modified file.

    Activate the changes by stopping and starting the agent:

    ./omcli stop agent

    ./omcli start agent

     

    Checking the log file /omc_dc/agent_inst/sysman/log/gcagent.log : no errors.

     

    Are there any items that are added to OMC? When navigating to e.g. Enterprise Health –> Entities, I can see entities which I haven’t added manually to EMC, and which are obviously added by the OMC – data explorer.

    You may ask why the status is unknown? That’s because of the OEM CC agent which died at my server while having some other problems.

     

    image

     

    Nice to know: how does this data collector collects his data of the repository? This is recorded in the log-file of the OMC-data collector log, hereby an example.

    SELECT a.target_guid, a.snapshot_guid, a.snapshot_type, a.target_type, a.start_timestamp, c.timezone_region, b.ecm_snapshot_id, b.FILE_SYSTEM “_E_FILE_SYSTEM_”

    FROM mgmt_ecm_gen_snapshot a

    INNER JOIN mgmt_targets c ON (a.target_guid = c.target_guid)

    INNER JOIN emaas_whitelist_entities w

    ON (w.harvester_name = ? AND c.target_guid = w.member_entity_guid)

    INNER JOIN em_ecm_all_config_snapshots e

    ON (a.snapshot_guid = e.ecm_snapshot_id AND a.start_timestamp = e.collection_timestamp   AND (  (w.is_new = 0 AND e.saved_timestamp >= ? AND e.saved_timestamp < ?)OR   (w.is_new = 1 AND e.saved_timestamp < ?)     )      )

    INNER JOIN em_ecm_all_config_metrics d

    ON (d.ecm_snapshot_id = a.snapshot_guid AND d.table_name = ? AND d.collection_timestamp = e.collection_timestamp)

    LEFT OUTER JOIN EM_ESM_FILESYS b ON (a.snapshot_guid = b.ecm_snapshot_id)

    WHERE a.target_type = ?

    AND a.snapshot_type = ?

    AND a.is_current = ?

    AND a.target_guid IS NOT NULL

    ORDER BY a.target_guid

     

    O.k. I clearly reached my goal. OMC is connected with OEM CC. And that’s where it stops at the moment for this blogpost.

    But now what? I would have love to show some more, but time is limited (and the trial period) . Clicking around in OMC is not very intuitive and you clearly need some time to figure out how to use this and discover the added value. You also need time to figure out what all the agents on your system are doing, except consuming resources.

    A recap: the concept of OMC is still very promising, however the GUI and the technical implementation needs improvement. But hey, the developers of OMC has just begun, compared with the maturity of OEM CC 13c.

    My major concern at the moment is the policy of Oracle. Their cloud-eagerness could led to a decision to replace OEM CC for OMC, while these products could be complementary to eachother, even reinforcing.

    Regardz.

     

    Resources:

    – Deploying Oracle Management Cloud for Infrastructure Monitoring: https://technology.amis.nl/2016/12/18/deploying-oracle-management-cloud-for-infrastructure-monitoring/

    – Documentation IT Analytics: http://docs.oracle.com/en/cloud/paas/management-cloud/emcad/setting-it-analytics.html

    – Oracle Management Cloud Master Note: (Doc ID 2092091.1)

    – IT Analytics Cloud Service Master Note (Doc ID 2107732.1)

    -Configuration cloud agent for log analytics on jcs instance : https://technology.amis.nl/2017/01/20/configuring-cloud-agent-for-log-analytics-on-jcs-instance-to-collect-weblogic-log-files/

    – Agent_install script: https://docs.oracle.com/en/cloud/paas/management-cloud/emcad/running-agentinstall-script.html

    The post Connecting Oracle Management Cloud with Oracle Enterprise Manager 13c appeared first on AMIS Oracle and Java Blog.

    Apache Kafka Streams – Running Top-N Aggregation grouped by Dimension – from and to Kafka Topic

    Sat, 2017-02-11 22:02

    This article explains how to implement a streaming analytics application using Kafka Streams that performs a running Top N analysis on a Kafka Topic and produces the results to another Kafka Topic. Visualized, this looks like this:

    image

    Two previous articles are relevant as reference:

    This GitHub Repo contains the sources for this article: https://github.com/lucasjellema/kafka-streams-running-topN.

    Note that almost all aggregations are specializations of this top-N: min, max, sum, avg and count are all simple top-1 aggregations that can be implemented using a simplified version of this code.

    To get started, go through the steps described in my previous article. This will result in an App.java class, that we can flesh out.

    <br>package nl.amis.streams.countries;</p> <p>import nl.amis.streams.JsonPOJOSerializer;<br>import nl.amis.streams.JsonPOJODeserializer;</p> <p>// generic Java imports<br>import java.util.Properties;<br>import java.util.HashMap;<br>import java.util.Map;<br>import java.util.Arrays;<br>// Kafka imports<br>import org.apache.kafka.common.serialization.Serde;<br>import org.apache.kafka.common.serialization.Serdes;<br>import org.apache.kafka.common.serialization.Serializer;<br>import org.apache.kafka.common.serialization.Deserializer;<br>// Kafka Streams related imports<br>import org.apache.kafka.streams.StreamsConfig;<br>import org.apache.kafka.streams.KafkaStreams;<br>import org.apache.kafka.streams.kstream.KStream;<br>import org.apache.kafka.streams.kstream.KTable;<br>import org.apache.kafka.streams.kstream.KStreamBuilder;<br>import org.apache.kafka.streams.processor.WallclockTimestampExtractor;</p> <p>import org.apache.kafka.streams.kstream.Window;<br>import org.apache.kafka.streams.kstream.Windowed;<br>import org.apache.kafka.streams.kstream.Windows;<br>import org.apache.kafka.streams.kstream.TimeWindows;</p> <p>public class App {<br>static public class CountryMessage {<br>/* the JSON messages produced to the countries Topic have this structure:<br>{ "name" : "The Netherlands"<br>, "code" : "NL<br>, "continent" : "Europe"<br>, "population" : 17281811<br>, "size" : 42001<br>};<br><br>this class needs to have at least the corresponding fields to deserialize the JSON messages into<br>*/</p> <p>public String code;<br>public String name;<br>public int population;<br>public int size;<br>public String continent;<br>}</p> <p>static public class CountryTop3 {</p> <p>public CountryMessage[] nrs = new CountryMessage[4] ;<br>public CountryTop3() {}<br>}</p> <p>private static final String APP_ID = "countries-top3-kafka-streaming-analysis-app";</p> <p>public static void main(String[] args) {<br>System.out.println("Kafka Streams Top 3 Demonstration");</p> <p>// Create an instance of StreamsConfig from the Properties instance<br>StreamsConfig config = new StreamsConfig(getProperties());<br>final Serde &lt; String &gt; stringSerde = Serdes.String();<br>final Serde &lt; Long &gt; longSerde = Serdes.Long();</p> <p>// define countryMessageSerde<br>Map &lt; String, Object &gt; serdeProps = new HashMap &lt; String, Object &gt; ();<br>final Serializer &lt; CountryMessage &gt; countryMessageSerializer = new JsonPOJOSerializer &lt; &gt; ();<br>serdeProps.put("JsonPOJOClass", CountryMessage.class);<br>countryMessageSerializer.configure(serdeProps, false);</p> <p>final Deserializer &lt; CountryMessage &gt; countryMessageDeserializer = new JsonPOJODeserializer &lt; &gt; ();<br>serdeProps.put("JsonPOJOClass", CountryMessage.class);<br>countryMessageDeserializer.configure(serdeProps, false);<br>final Serde &lt; CountryMessage &gt; countryMessageSerde = Serdes.serdeFrom(countryMessageSerializer, countryMessageDeserializer);</p> <p>// define countryTop3Serde<br>serdeProps = new HashMap&lt;String, Object&gt;();<br>final Serializer&lt;CountryTop3&gt; countryTop3Serializer = new JsonPOJOSerializer&lt;&gt;();<br>serdeProps.put("JsonPOJOClass", CountryTop3.class);<br>countryTop3Serializer.configure(serdeProps, false);</p> <p>final Deserializer&lt;CountryTop3&gt; countryTop3Deserializer = new JsonPOJODeserializer&lt;&gt;();<br>serdeProps.put("JsonPOJOClass", CountryTop3.class);<br>countryTop3Deserializer.configure(serdeProps, false);<br>final Serde&lt;CountryTop3&gt; countryTop3Serde = Serdes.serdeFrom(countryTop3Serializer, countryTop3Deserializer );</p> <p>// building Kafka Streams Model<br>KStreamBuilder kStreamBuilder = new KStreamBuilder();<br>// the source of the streaming analysis is the topic with country messages<br>KStream&lt;String, CountryMessage&gt; countriesStream = <br>kStreamBuilder.stream(stringSerde, countryMessageSerde, "countries");</p> <p>// A hopping time window with a size of 5 minutes and an advance interval of 1 minute.<br>// The window's name -- the string parameter -- is used to e.g. name the backing state store.<br>long windowSizeMs = 5 * 60 * 1000L;<br>long advanceMs = 1 * 60 * 1000L;<br>TimeWindows.of("hopping-window-example", windowSizeMs).advanceBy(advanceMs);</p> <p>// THIS IS THE CORE OF THE STREAMING ANALYTICS:<br>// top 3 largest countries per continent, published to topic Top3CountrySizePerContinent<br>KTable&lt;String,CountryTop3&gt; top3PerContinent = countriesStream<br>// the dimension for aggregation is continent; assign the continent as the key for each message<br>.selectKey((k, country) -&gt; country.continent)<br>// for each key value (every continent in the stream) perform an aggregation<br>.aggregateByKey( <br>// first initialize a new CountryTop3 object, initially empty<br>CountryTop3::new<br>, // for each country in the continent, invoke the aggregator, passing in the continent, the country element and the CountryTop3 object for the continent <br>(continent, countryMsg, top3) -&gt; {<br>// add the new country as the last element in the nrs array<br>top3.nrs[3]=countryMsg;<br>// sort the array by country size, largest first<br>Arrays.sort(<br>top3.nrs, (a, b) -&gt; {<br>// in the initial cycles, not all nrs element contain a CountryMessage object <br>if (a==null) return 1;<br>if (b==null) return -1;<br>// with two proper CountryMessage objects, do the normal comparison<br>return Integer.compare(b.size, a.size);<br>}<br>);<br>// lose nr 4, only top 3 is relevant<br>top3.nrs[3]=null;<br>return (top3);<br>}<br>, stringSerde, countryTop3Serde<br>, "Top3LargestCountriesPerContinent"<br>);<br>// publish the Top3 messages to Kafka Topic Top3CountrySizePerContinent <br>top3PerContinent.to(stringSerde, countryTop3Serde, "Top3CountrySizePerContinent");</p> <p>// prepare Top3 messages to be printed to the console<br>top3PerContinent.&lt;String&gt;mapValues((top3) -&gt; {<br>String rank = " 1. "+top3.nrs[0].name+" - "+top3.nrs[0].size <br>+ ((top3.nrs[1]!=null)? ", 2. "+top3.nrs[1].name+" - "+top3.nrs[1].size:"")<br>+ ((top3.nrs[2]!=null) ? ", 3. "+top3.nrs[2].name+" - "+top3.nrs[2].size:"")<br>; <br>return "List for "+ top3.nrs[0].continent +rank;<br>} <br>)<br>.print(stringSerde,stringSerde);</p> <p>System.out.println("Starting Kafka Streams Countries Example");<br>KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);<br>kafkaStreams.start();<br>System.out.println("Now started CountriesStreams Example");<br>}</p> <p>private static Properties getProperties() {<br>Properties settings = new Properties();<br>// Set a few key parameters<br>settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);<br>// Kafka bootstrap server (broker to talk to); ubuntu is the host name for my VM running Kafka, port 9092 is where the (single) broker listens <br>settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ubuntu:9092");<br>// Apache ZooKeeper instance keeping watch over the Kafka cluster; ubuntu is the host name for my VM running Kafka, port 2181 is where the ZooKeeper listens <br>settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "ubuntu:2181");<br>// default serdes for serialzing and deserializing key and value from and to streams in case no specific Serde is specified<br>settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());<br>settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());<br>settings.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");<br>// to work around exception Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: Invalid timestamp -1<br>// at org.apache.kafka.clients.producer.ProducerRecord.&lt;init&gt;(ProducerRecord.java:60)<br>// see: https://groups.google.com/forum/#!topic/confluent-platform/5oT0GRztPBo<br>settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);<br>return settings;<br>}</p> <p>}<br>

    Some special attention for:

    • static class CountryTop3 – custom class to hold the actual top3 for a continent; objects based on this class are passed around in the aggregator, and are produced to the output stream & topic
      image
    • countryTop3Serde – defined for serializing CountryTop3 object to Kafka Topic, using the JsonPOJOSerializer that can translate a Java POJO to a JSON representation (that is subsequently serialized as String to the Kafka Topic)
    • aggregator implementation in lambda that performs the actual aggregation – the operatoin aggregateByKey (aggregateByKey) is invoked with an Initializer(Initializer) that returns the initial instance of the CountryTop3 object (per continent) on which the aggregate will be built, an Aggregator (Aggregator) that receives the continent, the CountryTop3 object and the next CountryMessage and upgrades the CountryTop3 object to include the new CountryMessage, the Serdes (serializer/deserializer) for the key and the value and a String that is the name of the resulting KTable.image
    • mapValues in order to create printable strings from the CountryTop3 object – the Lambda expression used in the call to mapValues gets a CountryTop3 object as input – the value in the top3PerContinent KTable – and maps it to a String. As a result, the KTable <String,CountryTop3> is mapped to a KTable<String,String> that is streamed to the print operation, using the stringSerde for serializing the two String values.image

     

     

    To run the application, go through these four command line steps steps

    • (in an empty directory:)
      git clone https://github.com/lucasjellema/kafka-streams-running-topN
    • (navigate to directory kafka-streams-running-topN\Kafka-Streams-Country-TopN)
      mvn package
    • (in the same directory)
      mvn install dependency:copy-dependencies
    • (in the same directory)
      java -cp target/Kafka-Streams-Country-TopN-1.0-SNAPSHOT.jar;target/dependency/* nl.amis.streams.countries.App

     

     

    image

     

    Here is a screenshot of the Node.JS application busing producing country messages:

    image

    And here is some of the output produced by the Kafka Streams application:

    image

    Note how Mayotte enters at position one for the African continent, only to be quickly relegated by first Mozambique and then Namibia, only to disappear from the running top 3 when the message for Niger is consumed in the stream.

    You should also know that instead of simply pushing every change to the destination topic, we can using timing control – to calculate aggregates over a time slice and or produce outcomes only once every so often. I will demonstrate this is in a subsequent article.

    Kafka Tool shows us the topics involved in this article:

    image

    • countries is the source, produced to by the Node.js application
    • Top3CountrySizePerContinent is the destination topic for the Kafka Streams application, to which the running Top 3 messages are produced
    • countries-topn-streaming-analysis-app-Top3LargestCountriesPerContinent-changelog is a Topic created by Kafka Streams on the fly as store for intermediate results; the name of this Topic is derived from the (intermediate) KTable create in the streaming application.

    By routing the KTable to a Topic, all change events on the table are produced to the Topic. What I would have liked to be able to do is have only the latest message for each key – in this case the most recent top 3 for each continent – on the Topic. That is not what Kafka Streams does for me, not even when I am producing a KTable as opposed to a KStream. One thing I can do in this case is enable Log Compaction for the topic – although that is more like a hint to the Kafka engine than a strict instruction for removing older messages on the Topic for a key.

    Note: the Kafka Streaming application makes use of RocksDB – a simple local Java client database – to hold intermediate results. RocksDB stores data locally in a directory that can be configured. During development, when you run the same analysis on the same set of test data, over and over again, you may get unexpected results, because RocksDB continues with the data it has retained from previous runs. It may be wise to delete the RocksDB local data repository regularly, by just deleting the directory:

    image
    Resources

    An interesting resource is the Kafka Streams example KafkaMusicExample.java on generating a running Top 5 of all songs being played.

    A good read is the article Processing Tweets with Kafka Streams https://www.madewithtea.com/processing-tweets-with-kafka-streams.html

    The post Apache Kafka Streams – Running Top-N Aggregation grouped by Dimension – from and to Kafka Topic appeared first on AMIS Oracle and Java Blog.

    Getting Started with Kafka Streams – building a streaming analytics Java application against a Kafka Topic

    Sat, 2017-02-11 02:52

    Kafka Streams is a light weight Java library for creating advanced streaming applications on top of Apache Kafka Topics. Kafka Streams provides easy to use constructs that allow quick and almost declarative composition by Java developers of streaming pipelines that do running aggregates, real time filtering, time windows, joining of streams. Results from the streaming analysis can easily be published to Kafka Topic or to external destinations. Despite the close integration with Kafka and the many out of the box library elements, the application created with Kafka Streams is just a Java application, that can be deployed and run wherever Java applications can run (which is of course virtually anywhere).

    In this article I will show you my first steps with Kafka Streams. I will create a simple Kafka Streams application that streams messages from a Kafka Topic, processes them after dimensioning them (grouping by a specific key) and then keeps a running count. The running count is produced to a second Kafka Topic (as well as written to the console). Anyone interested in the outcome of this streaming analysis can consume this topic – without any dependency on the Kafka Streams based logic.

    This next figure shows the application. Country messages – simple JSON messages that describe a country with properties such as name, continent, population and size – are produced to a Kafka Topic. (this is done from a simple Node.JS application that reads the data from a CSV file and publishes the records to a Kafka Topic- this application is described in this article: NodeJS – Publish messages to Apache Kafka Topic with random delays to generate sample events based on records in CSV file). The Kafka Streams application consists of a single Java Class that creates a stream from the Kafka Topic. Elements in the stream are assigned a key – the continent – and are then counted-by-key. The result (the running count of countries per continent) is routed to an outbound stream that produces messages to a second Kafka Topic.

    image

     

    My starting point in this article is:

    • a running Kafka Cluster somewhere on a server (or as is actually the case, in a VM running on my laptop)
    • locally installed Java 8 (1.8.0_72)
    • locally installed Maven (3.2.5)
    • (optional) locally installed Node.js (6.9.4)

    image

    You will find all sources discussed in this article in this GitHub Repo: https://github.com/lucasjellema/kafka-streams-getting-started .

    The steps discussed below:

    • Initialize a new Maven project
    • Add dependencies on Kafka Streams to Maven pom-file
    • Implement Java Application:
      • create KStream using StreamBuilder for Kafka Topic countries
      • selectKey for messages in the KStream: continent
      • countByKey the messages in the KStream
      • route the resulting running count messages to a stream for the destination Kafka Topic RunningCountryCountPerContinent
      • print the running count messages to the console
    • Compile the application and build a JAR file – using mvn package (note: we will do a little tweaking on the exact dependencies)
    • Download all required JAR files needed for running the Kafka Streams application – using mvn install dependency:copy-dependencies
    • optional: Run NodeJS application to produce country messages to Kafka Topic countries Alternatively: manually publish country messages, create another application to publish country messages or use Kafka Connect to bring country messages across to the countries Topic
    • Run the Java application using the Maven generated JAR file and all JARs downloaded by Maven; this will produce messages on the Kafka Topic (which we can inspect, for example using Kafka Tool) and print messages to the console (that are even easier to inspect).

    If you want to inspect and perhaps edit the code but not necessarily create the application from scratch, you can clone the GitHub Repo: git clone https://github.com/lucasjellema/kafka-streams-getting-started.

    My starting point is the local file system, freshly cloned from a nearly empty GitHub repository. I have included the NodeJS application that will produce the country messages to the Kafka topic:

    SNAGHTML34420f

    1. Initialize a new Maven project

     

    mvn archetype:generate -DgroupId=nl.amis.streams.countries -DartifactId=Kafka-Streams-Country-Counter -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
    

     

    image

    The result:

    image

     

    2. Add dependency on Kafka Streams to the Maven pom-file

    The Maven Repo identifier is found from Maven Central: https://search.maven.org

    image

      <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-streams</artifactId>
         <version>0.10.0.0</version>
       </dependency>
    

    image
     

    3. Implement Java Application
    • create KStream using StreamBuilder for Kafka Topic countries
    • selectKey for messages in the KStream: continent
    • countByKey the messages in the KStream
    • route the resulting running count messages to a stream for the destination Kafka Topic RunningCountryCountPerContinent
    • print the running count messages to the console

    image

    Or in detail:

     

    public class App {
        static public class CountryMessage {
            /* the JSON messages produced to the countries Topic have this structure:
             { "name" : "The Netherlands"
             , "code" : "NL
             , "continent" : "Europe"
             , "population" : 17281811
             , "size" : 42001
             };
      
            this class needs to have at least the corresponding fields to deserialize the JSON messages into
            */
    
            public String code;
            public String name;
            public int population;
            public int size;
            public String continent;
        }
    
        private static final String APP_ID = "countries-streaming-analysis-app";
    
        public static void main(String[] args) {
            System.out.println("Kafka Streams Demonstration");
    
            // Create an instance of StreamsConfig from the Properties instance
            StreamsConfig config = new StreamsConfig(getProperties());
            final Serde < String > stringSerde = Serdes.String();
            final Serde < Long > longSerde = Serdes.Long();
    
            // define countryMessageSerde
            Map < String, Object > serdeProps = new HashMap < > ();
            final Serializer < CountryMessage > countryMessageSerializer = new JsonPOJOSerializer < > ();
            serdeProps.put("JsonPOJOClass", CountryMessage.class);
            countryMessageSerializer.configure(serdeProps, false);
    
            final Deserializer < CountryMessage > countryMessageDeserializer = new JsonPOJODeserializer < > ();
            serdeProps.put("JsonPOJOClass", CountryMessage.class);
            countryMessageDeserializer.configure(serdeProps, false);
            final Serde < CountryMessage > countryMessageSerde = Serdes.serdeFrom(countryMessageSerializer, countryMessageDeserializer);
    
            // building Kafka Streams Model
            KStreamBuilder kStreamBuilder = new KStreamBuilder();
            // the source of the streaming analysis is the topic with country messages
            KStream<String, CountryMessage> countriesStream = 
                                           kStreamBuilder.stream(stringSerde, countryMessageSerde, "countries");
    
            // THIS IS THE CORE OF THE STREAMING ANALYTICS:
            // running count of countries per continent, published in topic RunningCountryCountPerContinent
            KTable<String,Long> runningCountriesCountPerContinent = countriesStream
                                                                     .selectKey((k, country) -> country.continent)
                                                                     .countByKey("Counts")
                                                                     ;
            runningCountriesCountPerContinent.to(stringSerde, longSerde,  "RunningCountryCountPerContinent");
            runningCountriesCountPerContinent.print(stringSerde, longSerde);
    
    
    
            System.out.println("Starting Kafka Streams Countries Example");
            KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
            kafkaStreams.start();
            System.out.println("Now started CountriesStreams Example");
        }
    
        private static Properties getProperties() {
            Properties settings = new Properties();
            // Set a few key parameters
            settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
            // Kafka bootstrap server (broker to talk to); ubuntu is the host name for my VM running Kafka, port 9092 is where the (single) broker listens 
            settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ubuntu:9092");
            // Apache ZooKeeper instance keeping watch over the Kafka cluster; ubuntu is the host name for my VM running Kafka, port 2181 is where the ZooKeeper listens 
            settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "ubuntu:2181");
            // default serdes for serialzing and deserializing key and value from and to streams in case no specific Serde is specified
            settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            settings.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
            // to work around exception Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: Invalid timestamp -1
            // at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)
            // see: https://groups.google.com/forum/#!topic/confluent-platform/5oT0GRztPBo
            settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
            return settings;
        }
    

    Note: the Serde is an object that carries a serializer and a deserializer for a specific data type, used to serialize and deserialize keys and values into and from messages on a Kafka topic. Whenever our Java client consumes or produces elements, the Serde for those elements has to be provided. In this case, I have crafted the countryMessageSerde for the CountryMessage Java Class that is instantiated from a JSON message that is the value of consumed Kafka messages. This Serde carries a serializer and deserializer based on the JsonPOJODeserializer and JsonPOJOSerializer that are generic JSON to Java mappers, using the Jackson library for doing so.

     

    4. Compile the application and build a JAR file – using mvn package

    (note: we will later on do a little tweaking on the exact dependencies and set the correct Java version)

     

    Add the following plugin in the Maven pom file, to ensure that compilation is done for Java version 1.8 (8.0); this is required for Lambda expressions (Java 8) and use of Generics (Java 7)

    ...
      </dependencies>
      <build>
        <plugins>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
              <source>1.8</source>
              <target>1.8</target>
            </configuration>
          </plugin>
        </plugins>
      </build>
    </project>
    

     

     

    5. Download all required JAR files needed for running the Kafka Streams application

    using mvn install dependency:copy-dependencies

    mvn install dependency:copy-dependencies

    image

    All JAR files that follow from the dependencies defined in the pom.xml file are downloaded to the directory Kafka-Streams-Country-Counter\target\dependency

    image

     

    6. Produce Country Messages to Kafka Topic

    optional: Run NodeJS application to produce country messages to Kafka Topic countries Alternatively: manually publish country messages, create another application to publish country messages or use Kafka Connect to bring country messages across to the countries Topic

    image

    7. Run the Java application using the Maven generated JAR file and all JARs downloaded by Maven

    java -cp target/Kafka-Streams-Country-Counter-1.0-SNAPSHOT.jar;target/dependency/* nl.amis.streams.countries.App

    (note: on Linux, the semi colon separating the jar files should be a colon)

     

    I ran into several exceptions at this point. I will list them and show the resolutions:

    Exception in thread “StreamThread-1” org.apache.kafka.streams.errors.StreamsException: Failed to rebalance
            at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:299)
            at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)

            … 1 more
    Caused by: java.io.FileNotFoundException: C:\tmp\kafka-streams\countries-streaming-analysis-app\0_0\.lock (The system cannot find the path specified)
            at java.io.RandomAccessFile.open0(Native Method)

     

    Add the following line in the Java code getProperties() method:

    settings.put(StreamsConfig.STATE_DIR_CONFIG , “C:\\tmp”); // on Windows

    or

    settings.put(StreamsConfig.STATE_DIR_CONFIG , “/tmp”); // on Linux

    Exception in thread “StreamThread-1” java.lang.IllegalArgumentException: Invalid timestamp -1
            at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)

    See: https://groups.google.com/forum/#!topic/confluent-platform/5oT0GRztPBo for details on this exception.

    Add following line in Java code, method getProperties():

    settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

      and the associated import:

      import org.apache.kafka.streams.processor.WallclockTimestampExtractor;

      Exception in thread “StreamThread-1” java.lang.ExceptionInInitializerError
              at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:47)
              at org.rocksdb.RocksDB.<clinit>(RocksDB.java:23)
              at org.rocksdb.Options.<clinit>(Options.java:21)
              at org.apache.kafka.streams.state.internals.RocksDBStore.<init>(RocksDBStore.java:126)

      Caused by: java.lang.UnsupportedOperationException
              at org.rocksdb.util.Environment.getJniLibraryName(Environment.java:40)
              at org.rocksdb.NativeLibraryLoader.<clinit>(NativeLibraryLoader.java:19)

       

      This exception can occur on Windows and is caused by the fact that the version of RocksDB that Kafka Streams 0.10.0.0 has a dependency on does not include the required Windows DLL; RockDB 4.9 does include that DLL.

      Add dependency to Maven pom.xml:

          <!-- https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni -->
         <dependency>
            <groupId>org.rocksdb</groupId>
            <artifactId>rocksdbjni</artifactId>
            <version>4.9.0</version>
        </dependency>
      

       

      8. Run the Kafka Streams application

      Run (again)

      • mvn package
      • mvn install dependency:copy-dependencies
      • java -cp target/Kafka-Streams-Country-Counter-1.0-SNAPSHOT.jar;target/dependency/* nl.amis.streams.countries.App

       

      And now, finally, the running count is produced, both to the console:

      image

      and to the Kafka Topic, here seen in Kafka Tool:

      image

      The messages have a String type Key and a Long type value.

      Note: the topic is in the blue rectangle – countries-streaming-analysis-app-Counts-changelog – is created by the Kafka Streams library  as an intermediate change log for the running count. Instead of keeping the temporary results [only]in memory, they are produced to a Kafka Topic as well.

       

      Resources

      On Line Java Beautifier – http://www.tutorialspoint.com/online_java_formatter.htm

      The post Getting Started with Kafka Streams – building a streaming analytics Java application against a Kafka Topic appeared first on AMIS Oracle and Java Blog.

      Workshop Apache Kafka – presentation and hands on labs for getting started

      Fri, 2017-02-10 00:09

      The AMIS SIG session on Apache Kafka (9th February 2017) took 25 participants by the hand on a tour of Apache Kafka. Through presentations, demonstrations and a hands-on workshop, we provided a feet-hitting-the-ground-running introduction to Apache Kafka and Kafka Streams as bonus. Responsible for this workshop are Maarten Smeets and Lucas Jellema.

      All materials for the workshop are freely available. The sources and hands-on lab are available in a GitHub Repo: https://github.com/MaartenSmeets/kafka-workshop 

      The workshop discusses Hello World with Kafka, interacting with Kafka from Java and Node.JS, Kafka REST proxy, Kafka Streams, under the hood: partitions, brokers, replication and Kafka integration with Oracle Service Bus and Oracle Stream Analytics.

      image

      The presentation is also available from SlideShare: http://www.slideshare.net/lucasjellema/amis-sig-introducing-apache-kafka-scalable-reliable-event-bus-message-queue

      The post Workshop Apache Kafka – presentation and hands on labs for getting started appeared first on AMIS Oracle and Java Blog.

      Pages