Oracle BDSG Spark: Hands-on Lab                               

Contents

Introduction. 1

Lab Part 1: Use Oracle Big Data Spatial and Graph Vector Console. 6

Create Spatial Index. 6

Explore Spatial data. 8

Lab Part 2: Use Oracle Big Data Spatial and Graph Vector REST Spatial API 10

Create Spatial Index with GeoJSON file. 10

Use the service response as data source for the Map API 12

Lab Part 3: Use the Spark Shell with the Oracle Big Data Spatial and Graph Vector Scala API 15

Spatial Join using data from HDFS and from the Oracle Database. 15

Use  the Streaming API to monitor results in real time. 20

Lab Part 4: Create Customized Jobs Using the Oracle Big Data Spatial and Graph Vector Java API 27

Create a Job to Filter Spatial and Non Spatial Data with Spark SQL. 33

Run a Nearest Neighbors Analysis. 39

Concluding comments. 44

 

Introduction

Welcome to the Hands-on Introduction to the Spark spatial feature of Oracle Big Data Spatial and Graph.

Time to Complete

Perform all parts – 67 Minutes

This lab has four parts:

  1. Use Oracle Big Data Spatial and Graph Vector Console ( 15 mins)
  2. Use Oracle Big Data Spatial and Graph Vector REST Spatial API ( 10 mins)
  3. Use the Spark Shell with the Oracle Big Data Spatial and Graph Vector Scala API (26 mins)
  4. Create Customized Jobs Using the Oracle Big Data Spatial and Graph Vector Java API (16 mins)

The part one is using the console to run and visualize results. In the parts two you will be able to add more customization and integrate the response with the Map API using the REST Spatial API. Finally, parts three and four walk you through the use of the Scala and Java spatial functionalities.

These parts can be completed in any order so if you are particularly interested in one of them, do that one first in case you run out of time.

Note: One concept that is common to all the sections and needs to be understood before to begin this lab is the spatial index. The spatial index in Spark is like a spatial index in the database. It is possible to perform the same operations as with the Spatial RDD but faster. More information about the spatial index can be found in the Big Data Spatial and Graph User's Guide section “Spatially Indexing a Spatial RDD”.

Let’s get started.

1.       Install Big Data Lite from http://www.oracle.com/technetwork/database/bigdata-appliance/oracle-bigdatalite-2104726.html and log in, these steps apply for version 4.9 and on.

2.    Copy Vector-Spark-HOL.zip to the Big Data Lite Virtual Machine. You can do this following the instructions of the tip Sharing folders  between Big  Data Lite and its host of the Deployment Guide that can be found in the following link http://www.oracle.com/technetwork/database/bigdata-appliance/bigdatalite-quickdeploy-430-2844847.pdf

3.       Inside Big Data Lite, open the web browser Firefox by clicking the icon below.

               

 

4.    In Firefox click the menu icon on the right . Select Preferences, then select Content tab on the left options and uncheck the Block pop-up windows checkbox.

 

5.       Open the terminal.

 

6.       Unzip the file Vector-Spark-HOL.zip to the directory /opt/oracle/oracle-spatial-graph/spatial/vector/Spark-HOL and make this directory writable.

 

sudo unzip Vector-Spark-HOL.zip -d /opt/oracle/oracle-spatial-graph/spatial/vector/Spark-HOL

 

sudo chmod -R 777 /opt/oracle/oracle-spatial-graph/spatial/vector/Spark-HOL

 

7.       Start Tomcat and verify Spatial consoles are started.

sudo service bdsg start

You may see a message as like this:

Application started successfully.

In addition, you can check the application status with the following command.

sudo service bdsg status

Notes: If you run the status command and the console has started successfully, you may see a message as shown below:

8.       Create what will be our working directory in HDFS:

hadoop fs -mkdir /user/oracle/Spark-HOL

 

9.   Now load the data that we will use during our examples into HDFS:

hadoop fs -put /opt/oracle/oracle-spatial-graph/spatial/vector/Spark-HOL/data/tweets.json /user/oracle/Spark-HOL/tweets.json

 

hadoop fs -put /opt/oracle/oracle-spatial-graph/spatial/vector/Spark-HOL/data/sf_hotels.json /user/oracle/Spark-HOL/sf_hotels.json

 

10. Finally let start the Oracle Database and load the restaurants data.

Open the services terminal double-clicking on the “Start/Stop Services” icon.

11. Use the arrow keys to navigate the services. Use the spacebar to select the Oracle Database service and click Ok.

12. Launch SQL Developer from the Desktop Toolbar menu, as shown here:

13. In SQL Developer open the file:

/opt/oracle/oracle-spatial-graph/spatial/vector/Spark-HOL/data/sample_restaurants.sql

 

14. Click the run script button. When prompted for a connection, select the bdsguser connection and click OK. This will complete the setup.

 

Lab Part 1: Use Oracle Big Data Spatial and Graph Vector Console

Here is the first part of the lab where you will use the console to:

1) Create Spatial Index ( 3 mins) 

2) Explore Spatial Data ( 12 mins)

In this lab, the file /user/oracle/Spark-HOL/tweets.json will be used. This file is a GeoJSON file with sample tweets. The tweets contain the geometry information, a location text, the number of followers and the number of friends of the person that sent the tweet.

Note: The API provides InputFormats and RecordInfoProvider implementation for the common formats GeoJSON and ESRI Shapefiles. It is possible to use any Hadoop provided or customized InputFormat and any customized RecordInfoProvider.

 

Step 1: Open the web browser Firefox by clicking the icon below.

               

Note that if you need to open a new tab in Firefox click

 

Create Spatial Index

Now we will create a Spark index on the file /user/oracle/Spark-HOL/tweets.json.

1.       Open http://localhost:8045/spatialviewer/?root=vectorspark

2.       Specify all the required details:

a.       The index name: Let’s pretend these tweets are all from February, so the name of the new index for this example is TweetsFebruary.

b.       Path of data to the index:  Path of the file(s) to index in HDFS.  For this example we set /user/oracle/Spark-HOL/tweets.json

c.       New index path: This is the job output path. For this example we set /user/oracle/Spark-HOL/TweetsFebruary

d.       The SRID of the geometries used to build the index: 8307

e.       The tolerance of the geometries used to build the index: The tolerance reflects the distance that two points can be apart and still be considered the same (for example, to accommodate rounding errors). For this example we set 0.05

f.        Input Format class:  The InputFormat class implementation used to read the input data. For this example we set oracle.spatial.hadoop.vector.geojson.mapred.GeoJsonInputFormat

 

Note: If the data can be read with SparkContext.textFile, that is a record per line then, there is no need to set an InputFormat class.

 

g.       Input Format key class:  The InputFormat key class. For this example we set org.apache.hadoop.io.LongWritable

h.       Input Format value class:  The InputFormat value class. For this example we set org.apache.hadoop.io.Text

 

i.         Record Info Provider class: The class that provides the spatial information. For this example we set oracle.spatial.spark.vector.recordinfoprovider.GeoJsonRecordInfoProvider

 

 

 

3.       Click Create.

4.       When the following screen is display:

Click the URL http://localhost:8088  and wait until the job is completed successfully (refresh with F5 to see the job updates).

 

 

5.       Go back to the SpatialViewer and see the new created index in the list of index (the table can take a minute to refresh):

Explore Spatial data

Before to run this example create the sample spatial index performing the Task Create Spatial Index. In this task we will view the tweets data in a map.

1.       Open http://localhost:8045/spatialviewer/?root=vectorspark

2.       Click on the section Explore Data.

3.       If you have internet connectivity then select the Background “World Map”:

4.       Select the index TweetsFebruary created in the task Create Spatial Index.

5.       Use the Rectangle tool to select a region in North America and wait till the result is loaded:

 

Note that if you move the map during the data loading then the data will be refreshing (that is running extra jobs). This is a known issue and it will be fixed in the next version of the Map API.

 

6.       The results will be clustered. You can zoom in to see the actual records, pick one and click on it to see the details:

Lab Part 2: Use Oracle Big Data Spatial and Graph Vector REST Spatial API

In the prior section we showed how to run jobs and display their results in the vector console.  Now let´s see how to run jobs using the REST API and show the results using the Map API.

In this section we will use the command line to:

1)      Create Spatial Index with GeoJSON file ( 5 mins)

2)      Use the service response as data source for the Map API ( 5 mins)

 

Create Spatial Index with GeoJSON file

This task creates a spatial index using the /spatialviewer/api/v1/spark/createIndex service with the file /user/oracle/Spark-HOL/tweets.json as input. Let’s pretend these tweets are all from March, so the name of the new index is TweetsMarch. The arguments of the service are:

·         asynchronous: if true the index is created asynchronously, false by default.

·         indexName: The index name.

·         dataPath: the location of the data to be indexed.

·         indexPath: the location of the resulting spatial index.

·         srid: the Spatial Reference System id of the spatial data

·         tolerance: double value which represents the tolerance used when performing spatial operations (tolerance reflects the distance that two points can be apart and still be considered the same (for example, to accommodate rounding errors))

·         inputFormat (Optional): the InputFormat class implementation used to read the input data. If the input format class is not specified then the SparkContext's textFile is used.

·         keyClass: Class of the input format keys. This property is required if an input format is defined.

·         valueClass: Class of the input format values. This property is required if an input format is defined.

·         recordInfoProvider: the RecordInfoProvider implementation used to extract information from the records.

·         jarWithNeededClasses (Optional): jar name with the user-defined classes like custom RecordInfoProvider or InputFormat. If the InputFormat class or the RecordInfoProvider class is not in the API, or in the hadoop API classes, then a jar with the user-defined classes must be provided. To be able to use this jar the user must add it in the /opt/oracle/oracle-spatial-graph/spatial/web-server/spatialviewer/WEB-INF/lib directory and restart the server.

 

1.       Open the terminal

2.       Type the command in the terminal:

curl -v -X POST "http://localhost:8045/spatialviewer/api/v1/spark/createIndex" -H  "accept: application/json" -H  "content-type: application/json" -d "{  \"indexName\": \"TweetsMarch\",  \"dataPath\": \"/user/oracle/Spark-HOL/tweets.json\",  \"indexPath\": \"/user/oracle/Spark-HOL/TweetsMarch\",  \"srid\": 8307,  \"tolerance\": 0.05,  \"inputFormatClass\": \"oracle.spatial.hadoop.vector.geojson.mapred.GeoJsonInputFormat\",  \"keyClass\": \"org.apache.hadoop.io.LongWritable\",  \"valueClass\": \"org.apache.hadoop.io.Text\",  \"recordInfoProviderClass\": \"oracle.spatial.spark.vector.recordinfoprovider.GeoJsonRecordInfoProvider\"}"

Note: If the output path already exists change "indexPath": "/user/oracle/Spark-HOL/TweetsMarch" to another path or remove the folder /user/oracle/Spark-HOL/TweetsMarch from HDFS.

 

3.       Open http://localhost:8088/cluster/apps in a new Firefox tab and wait until the job is completed successfully (refresh with F5 to see the job updates).

4.  Back in the terminal the response contains the path to the new index: hdfs://bigdatalite.localdomain:8020/user/oracle/Spark-HOL/TweetsMarch

 

Use the service response as data source for the Map API

Before to run this example create the sample index performing the previous task Create Spatial Index with GeoJSON file. In this task we will use the /spatialviewer/api/v1/spark/filterByIndex service to filter the data in a query area and use the response as data source for the Map API.

The arguments of the service are:

·         indexPath: The index path.

·         queryWindow: The queryWindow in GeoJSON format.

Note that the query window can be created using the Map API toolbar standard tools like the Rectangle tool:

var queryWindow = toolbar.getBuiltInTool(OM.control.ToolBar.BUILTIN_RECTANGLE).getGeometry().transform(8307).toString();

 

1.       Open the terminal

2.       Create a file named filterTweets.html:

sudo gedit filterTweets.html

3.       Add to the file the following HTML code that will use the Map API to filter data in a given area. Note that the function showMap is called when the page is loaded <body onload="javascript:showMap()">:

 

<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">

 

<html style='width:100%;height:100%'>

<head>

<title></title>

<meta http-equiv='Content-Type' content='text/html; charset=UTF-8'>

<script type='text/javascript' src='http://localhost:8045/spatialviewer/api/v1/plugins/resource/oracle-maps/v2/oraclemapsv2.js'></script>

 

<script language="JavaScript" type="text/javascript">

   

    <!-- Specify the path to the Map API resources -->

    OM.gv.setResourcePath("/spatialviewer/api/v1/plugins/resource/oracle-maps/v2");

 

    <!-- Default http method when fetching remote geospatial contents -->

    OM.gv.setHttpMethod("GET");

 

    function showMap()

    {     

       <!-- Create the map -->

       var map = new OM.Map(

              document.getElementById('map'),

              {

                     universe : new OM.universe.ElocationUniverse(

                     {

                           srid : 8307,

                           bounds : new OM.geometry.Rectangle(-180,-90, 180,90,8307)

                     })

              }

       );

 

       <!-- Define the world countries layers as background layer -->

       var backgroundLayer = new OM.layer.VectorLayer(

              "backgroundLayer",

       {

              def : {

                     type : OM.layer.VectorLayer.TYPE_DATAPACK,

                     url : "/spatialviewer/templates/world_countries.json",

                     jsonp : true

              },

              boundingTheme : false

       });

 

 

       <!-- Add a navigation panel and add the backgroundLayer -->

       backgroundLayer.enableInfoWindow(false);

 

       var navigationPanelBar=new OM.control.NavigationPanelBar();

        map.addMapDecoration(navigationPanelBar);

 

       map.addLayer(backgroundLayer);

 

       <!-- Creates the tweets layer. The data are provided by the REST service /spatialviewer/api/v1/spark/filterByIndex-->  

       var tweetsLayer = new OM.layer.VectorLayer("Tweets",

       {

              def:

              {

                     type:OM.layer.VectorLayer.TYPE_DATAPACK,

                     url: '/spatialviewer/api/v1/spark/filterByIndex?indexPath=/user/oracle/Spark-HOL/TweetsMarch&queryWindow={"type":"Polygon", "coordinates":[[-110, 20, -110, 40, -100, 40, -100, 20, -110, 20]]}',

                     jsonp: true

              },

              boundingTheme: true

       });

                                             

 

       <!-- Add the layer -->    

       map.addLayer(tweetsLayer);

 

       <!-- Initialize the map -->

       map.init(); 

    }   

</script>

</head>

 

<body onload="javascript:showMap()">

    <DIV id=map style="width:99%;height:99%"></DIV>

</body>

</html>

4.       Use the file menu of the text editor to save and quit:

 

5.       Back in the terminal copy the html file to the spatialviewer Web content directory:

 

sudo cp filterTweets.html /opt/oracle/oracle-spatial-graph/spatial/web-server/spatialviewer

6.       Open the web browser Firefox by clicking the icon below and open http://localhost:8045/spatialviewer/filterTweets.html

 

Wait till the data are filtered (this can take a minute).

7.       When the filtering is done the data are display. Pick one and click on it to see the details.

 

Lab Part 3: Use the Spark Shell with the Oracle Big Data Spatial and Graph Vector Scala API

In the prior sections we showed how to run jobs and display the results using the SpatialViewer console and REST API. Now we will see how to use the Scala API to run spatial operations within the Spark Shell.

In this section we will:

1)      Spatial Join using data from HDFS and from the Oracle Database (14 mins)

2)      Use  the Streaming API to monitor results in real time (12 mins)

 

To learn how to filter, create custom RecordInfoProvider and more about how to create jobs using the Scala API review the examples in the folder /opt/oracle/oracle-spatial-graph/spatial/vector/examples.

Spatial Join using data from HDFS and from the Oracle Database

In this example we will create a spatial index using hotels data that are in HDFS. Then build a SpatialRDD from restaurants data that are in the Oracle Database and create a buffer of 200 meters around the restaurants. Additionally the restaurants have to be located in the Columbus Avenue (San Francisco) with the postal code 94133.

The end result will be a join operation between those data to find the restaurants within a distance of 200 meters from the hotels.

1.       Open the terminal.

2.       We will take advantage of the jars option in the spark-shell command to make the API needed JAR’s available to the cluster. For that end, create an environment variable named SPARK_LIB_JARS that references those jars by typing the following commands in the terminal:

export API_LIB_DIR=/opt/oracle/oracle-spatial-graph/spatial/vector/jlib

export SPARK_LIB_JARS=$API_LIB_DIR/hadoop-spatial-commons.jar,$API_LIB_DIR/sdospark-vector.jar,$API_LIB_DIR/sdohadoop-vector.jar,$API_LIB_DIR/sdohadoop-vector-hive.jar,$API_LIB_DIR/sdoapi.jar,$API_LIB_DIR/sdoutl.jar,$API_LIB_DIR/ojdbc8.jar

3.       Make these same JAR’s available to the driver, which is the JVM that’s created when you run the spark-shell command. For this to happen, we will set the SPARK_DRIVER_CLASS_PATH environment variable containing the needed jars:

export SPARK_DRIVER_CLASS_PATH=$API_LIB_DIR/hadoop-spatial-commons.jar:$API_LIB_DIR/sdospark-vector.jar:$API_LIB_DIR/sdohadoop-vector.jar:$API_LIB_DIR/sdohadoop-vector-hive.jar:$API_LIB_DIR/sdoapi.jar:$API_LIB_DIR/sdoutl.jar:$API_LIB_DIR/ojdbc8.jar

 

4.       Now open the Spark shell:

spark-shell --driver-class-path $SPARK_DRIVER_CLASS_PATH --jars $SPARK_LIB_JARS

 

5.       Note that you can copy/paste the commands to the spark shell.
First import the needed classes:

import oracle.spatial.hadoop.vector.geojson.mapred.GeoJsonInputFormat
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import oracle.spatial.spark.vector.recordinfoprovider.GeoJsonRecordInfoProvider
import scala.collection.JavaConversions
import oracle.spatial.spark.vector.scala.rdd.SpatialRDD
import oracle.spatial.spark.vector.scala.index.DistributedSpatialIndex
import oracle.spatial.spark.vector.index.quadtree.QuadTreeConfiguration
import oracle.spatial.spark.vector.scala.util.JDBCUtils
import java.sql.DriverManager
import oracle.spatial.spark.vector.util.ByPassRecordInfoProvider
import oracle.spatial.hadoop.vector.util.SpatialOperationConfig
import oracle.spatial.hadoop.vector.util.SpatialOperation
import oracle.spatial.spark.vector.SparkRecordInfo

6.       Specify the database URL, user name and password
val connURL = "jdbc:oracle:thin:@//localhost:1521/orcl"
val usr = "bdsguser"
val pwd = "welcome1"

 

7.       Load the Spatial RDD with the restaurants data from the Oracle Database. Only the restaurants in the street named COLUMBUS AVE and that have as postal code 94133 will be loaded. Note that to load all the restaurants replace the SQL query with the table name.

val jdbcSpatialRDD = JDBCUtils.createSpatialRDDFromQuery(
      sc, //spark context
      () => {
        Class.forName("oracle.jdbc.driver.OracleDriver");
        DriverManager.getConnection(connURL, usr, pwd);
      }, //DB connection supplier lambda
      "select * from POI_RESTAURANT where POI_POSTCODE=94133 and POI_STREET_NAME='COLUMBUS AVE'", //SQL query or table name if we want to query the whole table
      null //SparkRecordInfoProvider<ResultSet, SparkRecordInfo> (optional)
);

 

8.       Create a buffer of 200 meters for each restaurant’s geometry.
val bufferedRDD = SpatialRDD.fromRDD(
      jdbcSpatialRDD.map(r => {
        r.setGeometry(r.getGeometry().buffer(
            200.0, //number of meters
            0.05 //tolerance
            ))
        r
      }), new ByPassRecordInfoProvider())

 

9.       Load the hotels data from HDFS in a text file RDD.
val textRDD = sc.hadoopFile("/user/oracle/Spark-HOL/sf_hotels.json", classOf[GeoJsonInputFormat], classOf[LongWritable], classOf[Text]).map(tuple => { new Text(tuple._2) })

10.   Specify that the name of the hotel has to be added to the record extra fields. Then create the RecordInfoProvider and the spatial RDD with the hotels information.


val properties = List("NAME")

val recordInfoProvider = new GeoJsonRecordInfoProvider(JavaConversions.seqAsJavaList(properties),
        2, //dimensions
        8307 //SRID
        )

val spatialRDD = SpatialRDD.fromRDD(textRDD, recordInfoProvider)

 

11.   Specify an implicit Spark Context that will be use during the following spatial index creation.

@transient
implicit val sparkContext = sc

var index = DistributedSpatialIndex.createIndex(spatialRDD, new QuadTreeConfiguration())

 

12.   Create a spatial operation configuration and specify the operation as AnyInteract. Then perform the join to find the restaurants within a distance of 200 meters from the hotels.

val soc = new SpatialOperationConfig()

soc.setOperation(SpatialOperation.AnyInteract)

val joinedRDD = index.join[SparkRecordInfo, String](bufferedRDD, (r1, r2) => { "(" + r1.getField("NAME") + " - " + r2.getField("NAME") + ")" }, soc)

13.   Print the restaurants within a distance of 200 meters for each hotel.

println(joinedRDD.sortBy(c=>c).collect().foreach(println(_)))

 

14.   Exit the Spark Shell:

exit

Use  the Streaming API to monitor results in real time

In this example we will use a simulator that will send information about trucks every 10 seconds in the following CSV format:

{TRUCK_ID},{LOCATION_X_COORDINATE},{LOCATION_Y_COORDINATE},{CATEGORY}

For example:

1449,-122.09705,37.70886,10

The category is a number between 1 and 10.

Using the Spark shell we will create equipments in memory that will have as information an equipment id, the location and the category of the equipment. The category is, as for the trucks, a number between 1 and 10.

Then the spatial streaming API will be used to print the equipment’s five nearest trucks. Additionally the trucks and the equipments must have the same category. New results will be available every 15 seconds.

1.       Open the terminal.

2.       Set the environmental variable TRUCK_LOCATION_SERVICE_CLASS_PATH that will contain the trucks location simulator needed libraries.

 

export API_LIB_DIR=/opt/oracle/oracle-spatial-graph/spatial/vector/jlib

 

export TRUCK_LOCATION_SERVICE_CLASS_PATH=$HADOOP_CLASSPATH:/usr/lib/hadoop/client/hadoop-common.jar:$API_LIB_DIR/hadoop-spatial-commons.jar:$API_LIB_DIR/sdospark-vector.jar:$API_LIB_DIR/sdohadoop-vector.jar:$API_LIB_DIR/sdohadoop-vector-hive.jar:$API_LIB_DIR/sdoapi.jar:$API_LIB_DIR/sdoutl.jar:$API_LIB_DIR/ojdbc8.jar:/opt/oracle/oracle-spatial-graph/spatial/vector/Spark-HOL/streaming/truck-location-service.jar

3.       Start the truck location simulator
java -cp $TRUCK_LOCATION_SERVICE_CLASS_PATH oracle.spatial.spark.vector.demo.streaming.locationservice.TruckLocationServiceRunner

 

Once started the service will display the text Service started. Waiting for a client.

 

3.       Open a new terminal.

4.       We will take advantage of the jars option in the spark-shell command to make the API needed JAR’s available to the cluster. For that end, create an environment variable named SPARK_LIB_JARS that references those jars by typing the following command in the terminal:

 

export API_LIB_DIR=/opt/oracle/oracle-spatial-graph/spatial/vector/jlib

export SPARK_LIB_JARS=$API_LIB_DIR/hadoop-spatial-commons.jar,$API_LIB_DIR/sdospark-vector.jar,$API_LIB_DIR/sdohadoop-vector.jar,$API_LIB_DIR/sdohadoop-vector-hive.jar,$API_LIB_DIR/sdoapi.jar,$API_LIB_DIR/sdoutl.jar,$API_LIB_DIR/ojdbc8.jar

5.       Make these same JAR’s available to the driver, which is the JVM that’s created when you run the spark-shell command. For this to happen, we will set the SPARK_DRIVER_CLASS_PATH environment variable containing the needed jars:

export SPARK_DRIVER_CLASS_PATH=$API_LIB_DIR/hadoop-spatial-commons.jar:$API_LIB_DIR/sdospark-vector.jar:$API_LIB_DIR/sdohadoop-vector.jar:$API_LIB_DIR/sdohadoop-vector-hive.jar:$API_LIB_DIR/sdoapi.jar:$API_LIB_DIR/sdoutl.jar:$API_LIB_DIR/ojdbc8.jar

 

6.       Now open the Spark shell:

spark-shell --driver-class-path $SPARK_DRIVER_CLASS_PATH --jars $SPARK_LIB_JARS

 

7.       Note that you can copy/paste the commands to the spark shell.
First import the needed classes:

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Seconds
import oracle.spatial.spark.vector.scala.streaming.dstream.SpatialDStream
import oracle.spatial.geometry.JGeometry
import oracle.spatial.hadoop.vector.util.SpatialOperation
import oracle.spatial.hadoop.vector.util.SpatialOperationConfig
import oracle.spatial.spark.vector.SparkRecordInfoProvider
import oracle.spatial.spark.vector.SparkRecordInfo
import oracle.spatial.hadoop.vector.util.GeoJsonGen

 

8.       Define the class Truck.

case class Truck (id: String, location: JGeometry, category: String)  extends Serializable

 

9.       Define the class equipment.

case class Equipment(id: String, location: JGeometry, category: String) extends Serializable

 

10.   Define the RecordInfoProvider for the trucks. The SRID is 8307 and the record geometry is the truck location that is a JGeometry object.

class TruckRecordInfoProvider extends SparkRecordInfoProvider[Truck]{

 

  @Override def getRecordInfo(truck: Truck, recordInfo: SparkRecordInfo) = {

    recordInfo.setGeometry(truck.location)

    true

  }

 

  @Override def getSrid = 8307 

  @Override def setSrid(srid: Int) = null  

}

 

 

 

11.   Define the object HOLUtils. HOLUtils that will contain the sample equipments data and a function to transform the CSV truck information sent by our simulator to a Truck object.

 

object HOLUtils extends Serializable{

 

  val sampleEquipments = List(Equipment("32035878",JGeometry.createPoint(Array(-122.40849,37.7972),2,8307) ,"6"),Equipment("898166896",JGeometry.createPoint(Array(-122.40816,37.79769),2,8307) ,"4"),Equipment("32046146",JGeometry.createPoint(Array(-122.40857,37.79742),2,8307) ,"9"),Equipment("17524196",JGeometry.createPoint(Array(-122.40738,37.7984),2,8307) ,"5"),Equipment("19448008",JGeometry.createPoint(Array(-122.40726,37.79901),2,8307) ,"6"),Equipment("32032234",JGeometry.createPoint(Array(-122.40824,37.79898),2,8307) ,"8"),Equipment("31993883",JGeometry.createPoint(Array(-122.40575,37.79726),2,8307) ,"7"),Equipment("17619807",JGeometry.createPoint(Array(-122.40632,37.797),2,8307) ,"9"))

 

  def parseTruck(csvLine: String) = {

     val items = csvLine.split(",").map(_.trim)

     val geom = JGeometry.createPoint(Array(items(1).toDouble, items(2).toDouble), 2, 8307)

     new Truck(items(0), geom, items(3))

   }

}

 

12.   Create the streaming context. Note that the data will be stream every 15 seconds.

 

@transient val ssc = new StreamingContext(sc,Seconds(15))

 

13.   Specify the DStream that will get the truck data in CSV format and transform it to Truck objects.

 

@transient val truckStream : DStream[Truck] = ssc.socketTextStream("localhost",7777).map(csvLine=>{HOLUtils.parseTruck(csvLine)})

 

14.   Specify the SpatialDStream that will use the previously created DStream and the RecordInfoProvider to get the spatial information.

 

val spatialTruckStream : SpatialDStream[Truck] = SpatialDStream(truckStream, new TruckRecordInfoProvider())

 

15.   Define the following operation. For each equipment find the 5 nearest trucks belonging to the same category (truck.category==bEq.value.category). Then print the result.

 

val equipments : List[Equipment] = HOLUtils.sampleEquipments

 

for (eq <- equipments){

        val bEq = sc.broadcast(eq)

        val nnStream  : DStream[Seq[(Double,Truck)]] = spatialTruckStream.nearestNeighbors(truck=>{truck.category==bEq.value.category},5,new SpatialOperationConfig(SpatialOperation.None, eq.location, 0.05))

        nnStream.map(nnList=>{

                val msg = new StringBuilder("Nearest Neighbors for Equipment(")

                msg ++= " id="+bEq.value.id

                msg ++= ", category="+bEq.value.category

                msg ++= ", location="+GeoJsonGen.asGeometry(bEq.value.location)+")\n"

                for(nn<-nnList) msg ++= "\t Distance:"+nn._1+" Truck( id="+nn._2.id+", category="+nn._2.category+", location="+GeoJsonGen.asGeometry(nn._2.location)+")\n"

                msg

        }).print

}

16.   Start the execution of the streams. Wait till the connection is done and the results are displayed. This can take one minute. Initially the blocks may be empty, in those cases wait 15 seconds for the next blocks. Note that the distance displayed in the results is the distance in meters between the truck and the equipment.

ssc.start

 

17.   Interrupt the streaming using the Ctrl+C command (press the C key while holding down the Ctrl key on your keyboard).  Ignore any error thrown by the Spark shell when interrupting the streaming.

Note that the truck location simulator will stop with the streaming. Then to test again the simulator has to be restarted.

Lab Part 4: Create Customized Jobs Using the Oracle Big Data Spatial and Graph Vector Java API

So far we saw how to use the SpatialViewer features and how to use the Spark Shell with the spatial API.  Now let´s see how to create our own jobs in java using the API framework.

In this section we will:

1)      Create a Job to Filter Spatial and Non Spatial Data with Spark SQL (10 mins)

2)      Run a Nearest Neighbors Analysis  (6 mins)

For simplicity in the examples we won’t use any spatial index or any custom RecordInfoProvider.

To learn how to filter with index, create custom RecordInfoProvider and more about how to create jobs using the java API review the examples in the folder /opt/oracle/oracle-spatial-graph/spatial/vector/examples.

You can find the additional classes used in those examples in the folder /opt/oracle/oracle-spatial-graph/spatial/vector/Spark-HOL/java/src/

 

Step 1: Open JDeveloper using the terminal icon in the upper Task Bar (7 minutes for the following steps).

Step 2: Enter the following command:

sudo /u01/jdev/jdeveloper/jdev/bin/jdev

Step 3: Select the role Studio Developer (All Features) and click OK.

Step 4: Go to the Application menu and from the submenu list select New....

Step 5: Select Java Desktop Application and click OK

 

 

 

 

Step 6: Set the Application Name to SparkVectorApplication and click Next.

 

Step 7: Click Next.

 

Step 8: Set the project name to SparkVectorTest and click Next.

 

Step 9: Set the default package to hol and click Finish.

Step 10: Right click on the SparkVectorTest project in the Application Navigator and select Project Properties.

 

 Select the Libraries and Classpath option and click on Add JAR/Directory.

 

In the Add Archive/Directory window set the path to /opt/oracle/oracle-spatial-graph/spatial/vector/jlib and select all the jar files in this folder. Click Open. Then in the same way add the jar  spark-assembly.jar located in /usr/lib/spark/lib. Finally add all the Hadoop jars located in /usr/lib/hadoop/client. Now the project knows about the required files for vector processing. Finally Click OK.

 

 

Step 11: Right click on the project and select Project Properties. In the emergent window select Deployment option and click on Create Deployment Profile icon. Select JAR File as the Profile Type and set vector_examples as the Deployment Profile Name, click OK to Finish.

 

 

Step 12: In the emergent Edit JAR Deployment Profile Properties set the jar file to /opt/oracle/oracle-spatial-graph/spatial/vector/Spark-HOL/jlib/vector_examples.jar and click OK.

 

 

 

Step 13: Finally click OK in the Deployment screen.

Create a Job to Filter Spatial and Non Spatial Data with Spark SQL

The job will filter the tweets that are inside a query window and that have a number of followers higher than 50.

The class used in this example can be found in the following location: /opt/oracle/oracle-spatial-graph/spatial/vector/Spark-HOL/java/src/SparkSQLFiltering.java.

Note: The spatial functions available to use with Spark SQL are listed in the Big Data Spatial and Graph User's Guide section “Hive Spatial Functions”

 

1.       Right click on the SparkVectorTest project in the Application Navigator and select New. In the New Gallery window select Java class.

 

In the Create Java Class window, set SparkSQLFiltering as the Name for the class and click Ok.

 

2.       First add the imported classes after the package line to avoid compilation errors:

import java.util.ArrayList;

import java.util.List;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.JobConf;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.DataFrame;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.hive.HiveContext;

import oracle.spatial.hadoop.vector.util.JobUtils;

import oracle.spatial.spark.vector.SparkRecordInfo;

import oracle.spatial.spark.vector.rdd.SpatialJavaRDD;

import oracle.spatial.spark.vector.recordinfoprovider.GeoJsonRecordInfoProvider;

import oracle.spatial.hadoop.vector.geojson.mapred.GeoJsonInputFormat;

 

 

3.       Let´s start programming the SparkSQLFiltering.java class creating the main function. Add the following code before the bottom curly brace.

 

public static void main(String[] args) throws Exception{

            //Create a JobConf

            JobConf jobConf = new JobConf();

            //set input path

            JobUtils.setupInputPath(args[0], jobConf);

           

            // Create a Java Spark Context

            SparkConf conf = new SparkConf().setAppName("SparkSQLFiltering");

            JavaSparkContext sparkContext = new JavaSparkContext(conf);

           

            //load text file RDD

            JavaPairRDD<LongWritable, Text> textRDD = sparkContext.hadoopRDD(jobConf, GeoJsonInputFormat.class, LongWritable.class, Text.class);

 

            //list of GeoJSON properties to add to the record extra fields

            List<String> properties = new ArrayList<String>();

            properties.add("friends_count");

            properties.add("location");

            properties.add("followers_count");

            properties.add("user_id");

 

            //create RecordInfoProvider

            GeoJsonRecordInfoProvider recordInfoProvider = new GeoJsonRecordInfoProvider(properties, 2, 8307);

           

            //create a spatial RDD

            SpatialJavaRDD<SparkRecordInfo> spatialRDD = SpatialJavaRDD.fromJavaRDD(textRDD.values(), recordInfoProvider);

 

            HiveContext sqlContext = new HiveContext(sparkContext.sc());

           

            //get the spatial DataFrame from the SpatialRDD

            //the geometries are in GeoJSON format

            DataFrame spatialDataFrame = spatialRDD.createSpatialDataFrame(sqlContext, properties);

           

            // Register the DataFrame as a table.

            spatialDataFrame.registerTempTable("tweets");

            //register UDFs

            sqlContext.sql("create temporary function ST_Polygon as 'oracle.spatial.hadoop.vector.hive.ST_Polygon'");

            sqlContext.sql("create temporary function ST_Point as 'oracle.spatial.hadoop.vector.hive.ST_Point'");

            sqlContext.sql("create temporary function ST_Contains as 'oracle.spatial.hadoop.vector.hive.function.ST_Contains'");

           

            // SQL can be run over RDDs that have been registered as tables.

            StringBuffer query = new StringBuffer();

           

            query.append("SELECT geometry, friends_count, location, followers_count FROM tweets ");

            query.append("WHERE ST_Contains( ");

            query.append("  ST_Polygon('{\"type\": \"Polygon\",\"coordinates\": [[[-106, 25], [-106, 30], [-104, 30], [-104, 25], [-106, 25]]]}', 8307) ");

            query.append("  , ST_Point(geometry, 8307) ");

            query.append("  , 0.05)");

            query.append("  and followers_count > 50");

 

            DataFrame results = sqlContext.sql(query.toString());

           

            //Filter the tweets in a query window (somewhere in the north of Mexico)

            //and with more than 50 followers.

            //Note that since the geometries are in GeoJSON format it is possible to create the ST_Point like

            //ST_Point(geometry, 8307)

            //instead of

            //ST_Point(geometry, 'oracle.spatial.hadoop.vector.hive.json.GeoJsonHiveRecordInfoProvider')

 

            List<String> filteredTweets = results.javaRDD().map(new Function<Row, String>() {

              public String call(Row row) {

                      StringBuffer sb = new StringBuffer();

                      sb.append("Geometry: ");

                      sb.append(row.getString(0));

                     

                      sb.append("\nFriends count: ");

                      sb.append(row.getString(1));

                      sb.append("\nLocation: ");

                      sb.append(row.getString(2));

                      sb.append("\nFollowers count: ");

                      sb.append(row.getString(3));

 

                      return sb.toString();

              }

            }).collect();

 

            //print the filtered tweets

            filteredTweets.forEach(tweet -> System.out.println("Tweet: "+tweet));

           

            //close context

            sparkContext.close();

    }

 

4.       The code is ready!

5.       Right click on the project select Deploy and the vector_examples profile you just created. 

 

 

6.       Click Finish. The Jar file is deployed and ready to use. 

  1. Open the terminal.

  1. We will take advantage of the jars option in the spark-submit command to make the API needed JAR’s available to the cluster. For that end, create an environment variable named SPARK_LIB_JARS that references those jars by typing the following command in the terminal:

export API_LIB_DIR=/opt/oracle/oracle-spatial-graph/spatial/vector/jlib

 

export SPARK_LIB_JARS=$API_LIB_DIR/hadoop-spatial-commons.jar,$API_LIB_DIR/sdospark-vector.jar,$API_LIB_DIR/sdohadoop-vector.jar,$API_LIB_DIR/sdohadoop-vector-hive.jar,$API_LIB_DIR/sdoapi.jar,$API_LIB_DIR/sdoutl.jar,$API_LIB_DIR/ojdbc8.jar

  1. Make these same JAR’s available to the driver, which is the JVM that’s created when you run the spark-submit command. For this to happen, we will set the SPARK_DRIVER_CLASS_PATH environment variable containing the needed jars:

 

export SPARK_DRIVER_CLASS_PATH=$API_LIB_DIR/hadoop-spatial-commons.jar:$API_LIB_DIR/sdospark-vector.jar:$API_LIB_DIR/sdohadoop-vector.jar:$API_LIB_DIR/sdohadoop-vector-hive.jar:$API_LIB_DIR/sdoapi.jar:$API_LIB_DIR/sdoutl.jar:$API_LIB_DIR/ojdbc8.jar

  1. Additionally we will use variables to set the Spark configuration that will customize the log output to make it less verbose.

export SPARK_JAVA_OPTIONS="-Dlog4j.configuration=file:///opt/oracle/oracle-spatial-graph/spatial/vector/Spark-HOL/light-log4j.properties"

 

  1. Now run the job by entering the following command in the terminal:

spark-submit --class hol.SparkSQLFiltering --driver-java-options "$SPARK_JAVA_OPTIONS" --driver-class-path $SPARK_DRIVER_CLASS_PATH --jars $SPARK_LIB_JARS /opt/oracle/oracle-spatial-graph/spatial/vector/Spark-HOL/jlib/vector_examples.jar /user/oracle/Spark-HOL/tweets.json

 

  1. Three results are displayed. The resulting records have the same properties than in the input file tweets.json.

 

Run a Nearest Neighbors Analysis

This task will run nearest neighbors’ analysis on the tweets. The job will retrieve the 10 tweets nearer to the center of Paris with a number of followers higher than 50.

The class used in this example can be found in the following location /opt/oracle/oracle-spatial-graph/spatial/vector/Spark-HOL/java/src/NearestNeighbors.java

 

1.       Right click on the SparkVectorTest project in the Application Navigator and select New. In the New Gallery window select Java class.

 

In the Create Java Class window, set NearestNeighbors as the Name for the class and click Ok.

 

2.       First add the imported classes after the package line to avoid compilation errors:

 

import java.util.ArrayList;

import java.util.List;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.JobConf;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaSparkContext;

import oracle.spatial.geometry.JGeometry;

import oracle.spatial.hadoop.vector.util.JobUtils;

import oracle.spatial.spark.vector.SparkRecordInfo;

import oracle.spatial.spark.vector.rdd.SpatialJavaRDD;

import oracle.spatial.spark.vector.recordinfoprovider.GeoJsonRecordInfoProvider;

import oracle.spatial.spark.vector.serialization.SpatialVectorKryoRegistrator;

import scala.Tuple2;

import oracle.spatial.hadoop.vector.geojson.mapred.GeoJsonInputFormat;

import oracle.spatial.hadoop.vector.util.SpatialOperation;

import oracle.spatial.hadoop.vector.util.SpatialOperationConfig;

import oracle.spatial.spark.vector.SparkRecordInfoProvider;

import org.apache.spark.api.java.JavaRDD;

 

3.       Let´s start programming the NearestNeighbors.java class creating the main function. Add the following code before the bottom curly brace.

 

public static void main(String[] args) throws Exception {

     //Create a JobConf

     JobConf jobConf = new JobConf();

     //set input path

     JobUtils.setupInputPath(args[0], jobConf);

     // Create a Java Spark Context

    SparkConf conf = new SparkConf().setAppName("NearestNeighbors");

 

    //use the Kryo Serializer

    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

    //set SpatialVectorKryoRegistrator as Kryo Registrator

    SpatialVectorKryoRegistrator.register(conf);

 

    JavaSparkContext sparkContext = new JavaSparkContext(conf);

 

    //load text file RDD

    JavaRDD<Text> textRDD =

        sparkContext.hadoopRDD(jobConf,

        GeoJsonInputFormat.class, LongWritable.class, Text.class)

              .map(tuple->{return new Text(tuple._2().toString());});

 

    //list of GeoJSON properties to add to the record extra fields

    List<String> properties = new ArrayList<String>();

    properties.add("friends_count");

    properties.add("location");

    properties.add("followers_count");

    properties.add("user_id");

 

    //create RecordInfoProvider

    GeoJsonRecordInfoProvider recordInfoProvider =

        new GeoJsonRecordInfoProvider(properties, 2, 8307);

 

    //create a spatial RDD

    SpatialJavaRDD<SparkRecordInfo> spatialRDD =

        SpatialJavaRDD.fromJavaRDD(textRDD, recordInfoProvider);

 

    //create a point having the coordinates of Paris

    JGeometry centerParis =

        JGeometry.createPoint(

            new double[] { 2.349014, 48.864716 }, //coordinates of Paris

            2, //dimensions

            8307 //SRID

            );

 

 

    //determine the 10 nearest tweets of Paris

    //Note than the first element of the returned tuple is a double representing

    //the distance between the center of Paris and the tweet in meters

    List<Tuple2<Double, SparkRecordInfo>> nearestNeighbors =

        spatialRDD.nearestNeighbors(value -> {

             //return only tweets with a

             //followers_count attribute greater than 50

             return

                 Long.parseLong(value.getField("followers_count").toString())

                  > 50l;

        },

        10, //number of neighbors

        new SpatialOperationConfig(SpatialOperation.None,

                                   centerParis,

                                   0.05 //tolerance

                                   )

    );

             

    //print the returned tweets

    nearestNeighbors.forEach(

        nearestNeighbor ->

            System.out.println("Tweet: " + nearestNeighbor.toString())

    );

 

    //close context

    sparkContext.close();

}

 

5.       The code is ready!

6.       Right click on the project select Deploy and the vector_examples profile you just created. 

 

 

7.       Click Finish. The Jar file is deployed and ready to use. 

 

  1. Open the terminal.

  1. We will take advantage of the jars option in the spark-submit command to make the API needed JAR’s available to the cluster. For that end, create an environment variable named SPARK_LIB_JARS that references those jars by typing the following command in the terminal:

export API_LIB_DIR=/opt/oracle/oracle-spatial-graph/spatial/vector/jlib

 

export SPARK_LIB_JARS=$API_LIB_DIR/hadoop-spatial-commons.jar,$API_LIB_DIR/sdospark-vector.jar,$API_LIB_DIR/sdohadoop-vector.jar,$API_LIB_DIR/sdohadoop-vector-hive.jar,$API_LIB_DIR/sdoapi.jar,$API_LIB_DIR/sdoutl.jar,$API_LIB_DIR/ojdbc8.jar

  1. Make these same JAR’s available to the driver, which is the JVM that’s created when you run the spark-submit command. For this to happen, we will set the SPARK_DRIVER_CLASS_PATH environment variable containing the needed jars:

 

export SPARK_DRIVER_CLASS_PATH=$API_LIB_DIR/hadoop-spatial-commons.jar:$API_LIB_DIR/sdospark-vector.jar:$API_LIB_DIR/sdohadoop-vector.jar:$API_LIB_DIR/sdohadoop-vector-hive.jar:$API_LIB_DIR/sdoapi.jar:$API_LIB_DIR/sdoutl.jar:$API_LIB_DIR/ojdbc8.jar

  1. Additionally we will use variables to set the Spark configuration that will customize the log output to make it less verbose.

export SPARK_JAVA_OPTIONS=" -Dlog4j.configuration=file:///opt/oracle/oracle-spatial-graph/spatial/vector/Spark-HOL/light-log4j.properties"

 

  1. Now run the job by entering the following command in the terminal:

spark-submit --class hol.NearestNeighbors --driver-java-options "$SPARK_JAVA_OPTIONS" --driver-class-path $SPARK_DRIVER_CLASS_PATH --jars $SPARK_LIB_JARS /opt/oracle/oracle-spatial-graph/spatial/vector/Spark-HOL/jlib/vector_examples.jar /user/oracle/Spark-HOL/tweets.json

 

  1. The ten results are displayed. Note that the first element displayed is a double representing the distance between the center of Paris and the tweet in meters. The resulting records have the same properties than in the input file tweets.json and as specified have more than 50 followers.

 

Concluding comments

Thank you for participating in this hands-on lab. More examples are available under the folders /opt/oracle/oracle-spatial-graph/spatial/vector/examples and /opt/oracle/oracle-spatial-graph/spatial/raster/examples. More information can also be found in the following page:

http://www.oracle.com/technetwork/database/database-technologies/bigdata-spatialandgraph/overview/index.html