Testing Ignite with Spark-shell
Starting up the cluster
Here we will briefly cover the process of Spark and Ignite cluster startup. Refer to Spark documentation for more details.
For the testing you will need a Spark master process and at least one Spark worker. Usually Spark master and workers are separate machines, but for the test purposes you can start worker on the same machine where master starts.
-
Download and unpack Spark binary distribution to the same location (let it be
SPARK_HOME) on all nodes. -
Download and unpack Ignite binary distribution to the same location (let it be
IGNITE_HOME) on all nodes. -
On master node
cdto$SPARK_HOMEand run the following command:sbin/start-master.shThe script should output the path to log file of the started process. Check the log file for the master URL which has the following format:
spark://master_host:master_portAlso check the log file for the Web UI url (usually it ishttp://master_host:8080). -
On each of the worker nodes
cdto$SPARK_HOMEand run the following command:bin/spark-class org.apache.spark.deploy.worker.Worker spark://master_host:master_portwhere
spark://master_host:master_portis the master URL you grabbed from the master log file. After workers has started check the master Web UI interface, it should show all of your workers registered in statusALIVE. -
On each of the worker nodes cd to
$IGNITE_HOMEand start an Ignite node by running the following command:bin/ignite.sh
You should see Ignite nodes discover each other with default configuration. If your network does not allow multicast traffic, you will need to change the default configuration file and configure TCP discovery.
Working with Spark-Shell
Now that you have your cluster up and running, you can run spark-shell and check the integration.
-
Start spark shell:
-
Either by providing Maven coordinates to Ignite artifacts (you can use
--repositoriesif you need, but it may be omitted):./bin/spark-shell --packages org.apache.ignite:ignite-spark:1.8.0 --master spark://master_host:master_port --repositories http://repo.maven.apache.org/maven2/org/apache/ignite -
Or by providing paths to Ignite jar file paths using
--jarsparameter./bin/spark-shell --jars path/to/ignite-core.jar,path/to/ignite-spark.jar,path/to/cache-api.jar,path/to/ignite-log4j.jar,path/to/log4j.jar --master spark://master_host:master_port
You should see Spark shell started up.
Note that if you are planning to use spring configuration loading, you will need to add the
ignite-springdependency as well:./bin/spark-shell --packages org.apache.ignite:ignite-spark:1.8.0,org.apache.ignite:ignite-spring:1.8.0 --master spark://master_host:master_port -
-
Let’s create an instance of Ignite context using default configuration:
import org.apache.ignite.spark._ import org.apache.ignite.configuration._ val ic = new IgniteContext(sc, () => new IgniteConfiguration())You should see something like
ic: org.apache.ignite.spark.IgniteContext = org.apache.ignite.spark.IgniteContext@62be2836An alternative way to create an instance of IgniteContext is to use a configuration file. Note that if path to configuration is specified in a relative form, then the
IGNITE_HOMEenvironment variable should be globally set in the system as the path is resolved relative toIGNITE_HOMEimport org.apache.ignite.spark._ import org.apache.ignite.configuration._ val ic = new IgniteContext(sc, "examples/config/spark/example-shared-rdd.xml") -
Let’s now create an instance of
IgniteRDDusing "partitioned" cache in default configuration:val sharedRDD = ic.fromCache[Integer, Integer]("partitioned")You should see an instance of RDD created for partitioned cache:
shareRDD: org.apache.ignite.spark.IgniteRDD[Integer,Integer] = IgniteRDD[0] at RDD at IgniteAbstractRDD.scala:27Note that creation of RDD is a local operation and will not create a cache in Ignite cluster.
-
Let’s now actually ask Spark to do something with our RDD, for example, get all pairs where value is less than 10:
sharedRDD.filter(_._2 < 10).collect()As our cache has not been filled yet, the result will be an empty array:
res0: Array[(Integer, Integer)] = Array()Check the logs of remote spark workers and see how Ignite context will start clients on all remote workers in the cluster. You can also start command-line Visor and check that "partitioned" cache has been created.
-
Let’s now save some values into Ignite:
sharedRDD.savePairs(sc.parallelize(1 to 100000, 10).map(i => (i, i)))After running this command you can check with command-line Visor that cache size is 100000 elements.
-
We can now check how the state we created will survive job restart. Shut down the spark shell and repeat steps 1-3. You should again have an instance of Ignite context and RDD for "partitioned" cache. We can now check how many keys there are in our RDD which value is greater than 50000:
sharedRDD.filter(_._2 > 50000).countSince we filled up cache with a sequence of number from 1 to 100000 inclusive, we should see
50000as a result:res0: Long = 50000
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.
