Apache Spark is a distributed computing framework with implicit parallelism and fault-tolerance. The Ampool ADS is distributed in-memory store with built-in fault-tolerance. Typically Apache Spark caches data into its executor memory for faster processing and then uses the underlying disk based stores as and when needed. The data fetched is immutable and typically it is persisted to the disk based stores and re-read for processing at later point in time or by other Spark jobs. Thus, Ampool ADS complements the Apache Spark by supporting the in-memory tables that Spark can directly access and process data from it. It eliminates the need to cache large immutable data, which is local, in Spark jobs but instead the data cached in Ampool ADS, being global, can be accessed by any Spark job at any point in time.
Ampool provides a seamless integration between its memory tables and Spark DataFrames. A Spark DataFrame is a distributed collection of data organized into named columns similar to the Ampool memory tables. Also, Spark provides SQL like query functionality as DataFrame API (like filters and aggregations) that is transparently supported, by Ampool, on the data data stored in Ampool ADS. Apart from the loading the data-frames from Ampool ADS, no change is required on the client side.
The below diagram shows the deployment of Ampool ADS cluster along with Spark cluster. Though the Spark cluster can access the remote Ampool ADS, it is recommended to co-locate the applications (Spark workers) and the respective Ampool ADS nodes. Also, once Ampool ADS is configured as a data store, it takes care of optimal access and efficiency and application need not worry about it.
Spark cluster is typically deployed either in standalone mode or in the existing cluster managed by YARN or Mesos. Spark cluster using Ampool ADS as an in-memory data store can take advantage of data locality if both are configured to co-locate on the same nodes.
The Ampool-Spark connector jars along with the dependent libraries are required to be provided when launching the Spark jobs; either via Spark shell or by submitting the Spark applications to the cluster. When running Spark applications the only mandatory configuration is to provide the Ampool cluster locator details — the locator host-name and locator port. The Spark takes care of distributing the connector jars to all executors. It can be avoided if the required connector jars are installed on the Spark worker nodes.
The Ampool-Spark connector enables you to:
The Ampool ADS, via connector, supports following data types to be used as column types. These types are mapped to the respective Spark data-frame types.
Ampool also supports following complex types containing one or more of the above basic types:
Along with the above types Spark-ML Vector, the user defined type, is also supported as native type by the connector. Both the Sparse and Dense vectors can be read and saved to Ampool like any other basic type.
Once all the required libraries are available and cluster is running, the Spark jobs can be launched (via spark-shell or spark-submit) as below:
$ <spark-home>/bin/spark-shell --jars <ampool-spark-jar>,<ampool-client-dependencies-jar>
The Ampool cluster (i.e. locator) details can be provided via following properties:
This configuration or any other configuration attributes can be provided as a map of key-values as below:
scala> val options = Map(("ampool.locator.host", "localhost"), ("ampool.locator.port", "10334"))
Following are some examples demonstrating how to save an existing data-frame to Ampool or load an existing Ampool table as a data-frame in Spark.
To save an existing DataFrame as Ampool Table you can execute following command (from spark-shell).
scala> val myDataFrame: DataFrame = ... scala> val options = Map(("ampool.locator.host","localhost"), ("ampool.locator.port","10334")) scala> myDataFrame.write.format("io.ampool").options(options).save("ampool_table")
The above command will create an ordered Ampool MTable called ampool_table in an Ampool ADS (using localhost and 10334 as locator host and port respectively). The schema of the created table will be equivalent to the schema of myDataFrame.
If you have JSON data as a source, it can be converted to a data-frame and saved to Ampool ADS as a table.
scala> val options = Map((“ampool.locator.host”,”localhost”), (“ampool.locator.port”,”10334″)) scala> val jsonDataFrame = sqlContext.read.json(path) scala> jsonDataFrame.write.format(“io.ampool”).options(options).save(“json_table”)
If you already have an Ampool table, it can be loaded as a data-frame in Spark (from spark-shell):
scala> val options = Map(("ampool.locator.host","localhost"), ("ampool.locator.port","10334")) scala> val ampoolDataFrame = sqlContext.read.format("io.ampool").options(options).load("ampool_table") scala> ampoolDataFrame.show() scala> ampoolDataFrame.filter("size > 4096").show()
The above command will create a spark data-frame from existing ampool_table. Once the table is loaded as data-frame, we can execute any supported data-frame operation on it, which will eventually translate/retrieve data from Ampool ADS as required.
Once the Ampool table loaded as a data-frame, you can register it as temporary table and then use the Spark SQL to query data from Ampool ADS (from spark-shell):
scala> val options = Map(("ampool.locator.host","localhost"), ("ampool.locator.port","10334")) scala> sqlContext.read.format("io.ampool").options(options).load("ampool_table").registerTempTable("my_table") scala> val results = sqlContext.sql("select * from my_table where size = 4096") scala> results.show() scala> println(results.count()) scala> results.foreach(println)
You can also use the custom Spark UDFs with Ampool connector. You can register the custom UDF, say SampleEqualsUDF, with the SQL Context with appropriate return type. Once registered, it can be used similar to any other built-in UDFs in SQL queries as below:
scala> val df = sqlContext.read.format("io.ampool").load("ampool_table") scala> df.registerTempTable("my_table") scala> sqlContext.udf.register("MyEquals", SampleEqualsUDF.SAMPLE_EQUALS_UDF, DataTypes.BooleanType) scala> sqlContext.sql("select * from my_table where MyEquals(column_1, 'constant_value')").show()