Learning Salesforce Einstein
上QQ阅读APP看书,第一时间看更新

Data

In the DASE architecture, data is prepared by two components sequentially--Data Source and Data Preparator.

Data Source takes data from the data store and prepares an RDD [Rating] for the ALS algorithm. Consider the following diagram:

The Data Preparator sample looks as follows:

   package org.template.recommendation;

import org.apache.predictionio.controller.java.PJavaPreparator;
import org.apache.spark.SparkContext;

public class Preparator extends PJavaPreparator<TrainingData,
PreparedData> {

@Override
public PreparedData prepare(SparkContext sc,
TrainingData trainingData) {
return new PreparedData(trainingData);
}
}

The PreparedData class returns TrainingData.

The sample Java class for DataSource consists of the method named readTraining(), which can read the event's and create Training Data based on those events.

The sample Java code looks as follows:

    public class DataSource extends PJavaDataSource<TrainingData, 
EmptyParams, Query, Set<String>> {

private final DataSourceParams dsp;

public DataSource(DataSourceParams dsp) {
this.dsp = dsp;
}

@Override
public TrainingData readTraining(SparkContext sc) {
JavaRDD<UserItemEvent> viewEventsRDD = PJavaEventStore.find(
dsp.getAppName(),
OptionHelper.<String>none(),
OptionHelper.<DateTime>none(),
OptionHelper.<DateTime>none(),
OptionHelper.some("user"),
OptionHelper.<String>none(),
OptionHelper.some(Collections.singletonList("view")),
OptionHelper.<Option<String>>none(),
OptionHelper.<Option<String>>none(),
sc)
.map(new Function<Event, UserItemEvent>() {
@Override
public UserItemEvent call(Event event) throws Exception {
return new UserItemEvent(event.entityId(),
event.targetEntityId().get(),
event.eventTime().getMillis(), UserItemEventType.VIEW);
}
});
}
--
return new TrainingData(usersRDD, itemsRDD, viewEventsRDD,
buyEventsRDD);
}

The key things to note from the preceding code are as follows:

Scala version of the preceding code is shown next.

The Preparator class sample is as follows:

    package MyRecommedationScala

import org.apache.predictionio.controller.PPreparator

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD

class Preparator extends PPreparator[TrainingData, PreparedData] {
def prepare(sc: SparkContext, trainingData: TrainingData):
PreparedData = {
new PreparedData(
users = trainingData.users,
items = trainingData.items,
viewEvents = trainingData.viewEvents,
buyEvents = trainingData.buyEvents)
}
}

class PreparedData(
val users: RDD[(String, User)],
val items: RDD[(String, Item)],
val viewEvents: RDD[ViewEvent],
val buyEvents: RDD[BuyEvent]
) extends Serializable

The DataSource class reads the events and creates TrainingData based on the following events:

    class DataSource(val dsp: DataSourceParams)
extends PDataSource[TrainingData,
EmptyEvaluationInfo, Query, EmptyActualResult] {

@transient lazy val logger = Logger[this.type]

override
def readTraining(sc: SparkContext): TrainingData = {

val eventsRDD: RDD[Event] = PEventStore.find(
appName = dsp.appName,
entityType = Some("user"),
eventNames = Some(List("view", "buy")),
// targetEntityType is optional field of an event.
targetEntityType = Some(Some("item")))(sc).cache()

val viewEventsRDD: RDD[ViewEvent] = eventsRDD
.filter { event => event.event == "view" }
.map { ... }

...

new TrainingData(...)
}
}