Get Started with Ampool

Try Ampool developer edition for FREE

Deep Dive with MTable

February 7, 2017|by: Avinash Dongre

Deep Dive with MTable

Continuing from the previous post , In this post we will demonstrate some of the APIs to create and access data in MTable. The example below uses Java APIs to create and populate the data and Spark DataFrame/SQL API to query data from the MTable. Note: Spark SQL also support Hive Query syntax and UDFs.

Use case

Let us use a sample data set providing information on various Hotels including their pricing, amenities and customer reviews etc. We would use it to build a portal to find hotels in a certain price range or to get the review for a particular hotel or get list of hotels with price range and specific rating and so on. Sample Data can be downloaded from TripAdvisor Data Set. Complete Example code is available in Ampool’s public git repository.

Lets define 2 tables HotelInfoTable and RatingReviewTable w/ schema given below,

HotelInfoTable

Field Type
name String
hotelURL String
priceRangeStart Double
priceRangeStop Double
address String
HotelID Integer
imgURL String

RatingReviewTable

Field Type
contents String
date Date
author String
service Integer
business_service Integer
cleanliness Integer
check_in_front_desk Integer
overall Double
value Integer
rooms Integer
location Integer
HotelID Integer

 

Let’s get to the code

Step 1 : Setup Ampool Cluster

At minimum, user is expected have single node Ampool Cluster and Spark Cluster installed. Spark Scala shell will be used to run the spark queries on data frames pointing to underlying Ampool tables.

Ampool Data Store can be downloaded from

Use Java JDK 1.8

Step 2 : Java Client: Create Client Cache
Following code for creating a client cache actually gets a handle to Ampool’s server side memory pool. Although there is a way, it does not implicitly cache any server side table data on the client ( and hence client cache is bit of a misnomer in this case). To get the handle to Ampool’s server side memory pool, create MConfiguration object providing Ampool’s locator host and port number.

MConfiguration mconf = MConfiguration.create();
mconf.set(Constants.MonarchLocator.MONARCH_LOCATOR_ADDRESS, 127.0.0.1);
mconf.setInt(Constants.MonarchLocator.MONARCH_LOCATOR_PORT, 10334);
MClientCache clientCache = new MClientCacheFactory().create(mconf);

Step 3 : Java Client Define a MTableDecriptor.
In Ampool Active Data Store, MTableDescriptor class defines MTable schema and sets additional properties such as, Table Type, Persistence policy, Max versions for each row, Number of table splits (data buckets), Split range etc. See Online Documentation for more details.

In this case we use MTable of type UNORDERED for both the tables.

MTableDescriptor tableDescriptor = new MTableDescriptor(MTableType.UNORDERED);
tableDescriptor.enableDiskPersistence(MDiskWritePolicy.ASYNCHRONOUS);

tableDescriptor.addColumn("name", MBasicObjectType.STRING);
tableDescriptor.addColumn("hotelURL", MBasicObjectType.STRING);
tableDescriptor.addColumn("priceRangeStart", MBasicObjectType.DOUBLE);
tableDescriptor.addColumn("priceRangeStop", MBasicObjectType.DOUBLE);
tableDescriptor.addColumn("address", MBasicObjectType.STRING);
tableDescriptor.addColumn("HotelID", MBasicObjectType.INT);
tableDescriptor.addColumn("imgURL", MBasicObjectType.STRING);
MTableDescriptor tableDescriptor = new MTableDescriptor(MTableType.UNORDERED);
tableDescriptor.enableDiskPersistence(MDiskWritePolicy.ASYNCHRONOUS);

tableDescriptor.addColumn("contents", MBasicObjectType.STRING);
tableDescriptor.addColumn("date", MBasicObjectType.DATE);
tableDescriptor.addColumn("reviewId", MBasicObjectType.STRING);
tableDescriptor.addColumn("author", MBasicObjectType.STRING);
tableDescriptor.addColumn("service", MBasicObjectType.INT);
tableDescriptor.addColumn("business_service", MBasicObjectType.INT);
tableDescriptor.addColumn("cleanliness", MBasicObjectType.INT);
tableDescriptor.addColumn("check_in_front_desk", MBasicObjectType.INT);
tableDescriptor.addColumn("overall", MBasicObjectType.DOUBLE);
tableDescriptor.addColumn("value", MBasicObjectType.INT);
tableDescriptor.addColumn("rooms", MBasicObjectType.INT);
tableDescriptor.addColumn("location", MBasicObjectType.INT);
tableDescriptor.addColumn("HotelID", MBasicObjectType.INT);

Step 4 : Java Client: Creating Tables
In Ampool Data Store , Creating a table is admin operation, Admin handle can be obtained using clientCache.getAdmin API.

MTable ratingReviewTable = clientCache.getAdmin().createTable("RatingReviewTable", tableDescriptor);
MTable hotelInfoTable = clientCache.getAdmin().createTable("HotelInfoTable", tableDescriptor);

Step 5 : Java Client: Populating Tables
MPut class is used to define the Row Key and Column values for each record to be inserted into MTable. MPut objects can also be batched as List and the whole batch of records can be inserted in a single Put operation.

Following code show populating single record in both “hotelInfoTable” and RatingReviewTable“. User can use loop to insert multiple records.

// Create MPut object w/ row key
MPut putRecord = new MPut(Bytes.toBytes(hotelInfoObj.getHotelID()));

// Start adding columns with Column Name and Column Values.
putRecord.addColumn("name", hotelInfoObj.getName());
putRecord.addColumn("hotelURL", hotelInfoObj.getHotelURL());
putRecord.addColumn("priceRangeStart", hotelInfoObj.getPriceRangeStart());
putRecord.addColumn("priceRangeStop", hotelInfoObj.getPriceRangeStop());
putRecord.addColumn("address", hotelInfoObj.getAddress());
putRecord.addColumn("HotelID", hotelInfoObj.getHotelID());
putRecord.addColumn("imgURL", hotelInfoObj.getImgURL());

// perform put operation
hotelInfoTable.put(putRecord);
// Create MPut object w/ row key
MPut putRecord = new MPut(reviewInfo.getReviewId());

// Start adding columns with Column Name and Column Values.
putRecord.addColumn("contents", reviewInfo.getContent());

SimpleDateFormat sdf = new SimpleDateFormat("MMMM d, yyyy");
final Date date = sdf.parse(reviewInfo.getDate());
putRecord.addColumn("date", new java.sql.Date(date.getTime()));

putRecord.addColumn("reviewId", reviewInfo.getReviewId());
putRecord.addColumn("author", reviewInfo.getAuthor());

RatingsInfo ratingsInfo = reviewInfo.getRatingsInfo();
putRecord.addColumn("service", ratingsInfo.getService());
putRecord.addColumn("business_service", ratingsInfo.getBusiness_service());
putRecord.addColumn("cleanliness", ratingsInfo.getCleanliness());
putRecord.addColumn("check_in_front_desk", ratingsInfo.getCheck_in_front_desk());
putRecord.addColumn("overall", ratingsInfo.getOverall());
putRecord.addColumn("value", ratingsInfo.getValue());
putRecord.addColumn("rooms", ratingsInfo.getRooms());
putRecord.addColumn("location", ratingsInfo.getLocation());

putRecord.addColumn("HotelID", hotelInfoObj.getHotelID());

ratingReviewTable.put(putRecord);

Step 6 : Querying MTable using Spark
Lets see how one can point Spark data frames to Ampool MTable and query query the data using Spark SQL.

Ampool versions 1.1 & 1.2 support Spark 1.6.1

final String locatorHost = "localhost";
final int locatorPort = 10334;

SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SparkQueryRunner");
JavaSparkContext jsc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(jsc);
Map<String, String> options = new HashMap<>(3);
options.put("ampool.locator.host", locatorHost);
options.put("ampool.locator.port", String.valueOf(locatorPort));

DataFrame hotelInfoDF = sqlContext.read().format("io.ampool").options(options).load(HOTEL_INFO_TABLE);
DataFrame ratingReviewDF = sqlContext.read().format("io.ampool").options(options).load(RATING_REVIEW_TABLE);

hotelInfoDF.show();
ratingReviewDF.show();

hotelInfoDF.registerTempTable("HI");
ratingReviewDF.registerTempTable("RR");

sqlContext.sql("select name from HI").show();
sqlContext.sql("select name, priceRangeStart from HI where priceRangeStart > 100").show();

final DataFrame dataFrame = sqlContext.sql("select first(HI.HotelID) AS HOTELID, first(HI.name) AS NAME, AVG(RR.overall) as RATING, first(HI.priceRangeStart) AS STARTPRICE, first(HI.priceRangeStop) AS STOPPRICE from HI, " +
"RR where HI.HotelID = RR.HotelID and HI.name != 'null' and HI.priceRangeStart > 150.0 and HI.priceRangeStop < 200.0 and HI.priceRangeStart < HI.priceRangeStop " +
"group by HI.HotelID order by RATING DESC");

dataFrame.filter("RATING > 3.0 and RATING < 5.0").show();

Complete Example code is located here. You will need to update variable ‘DATA_DIRECTORY‘ in io.ampool.MainRunner.java to the location where you have downloaded the TripAdvisor data.

So give it a try!

 

Get Started with Ampool

Try Ampool developer edition for free.