Spark大数据商业实战三部曲:内核解密、商业案例、性能调优(第2版)
上QQ阅读APP看书,第一时间看更新

12.5 通过RDD分析电影点评系统实现Java和Scala版本的二次排序系统

本节实现RDD分析大数据电影点评系统的二次排序功能,分别使用Java和Scala语言来实现。在实现大数据电影点评系统的二次排序前,我们先实现一个Java版本二次排序的例子,来阐述二次排序的实现。

数据准备:在data/moviedata/medium/目录下新建文本文件dataforsecondarysorting.txt,在文本文件中输入如下测试数据。

1.  2  4
2.  2  10
3.  3  6
4.  1  5

文本文件dataforsecondarysorting.txt中有两列数字,二次排序的含义是先按照第一列数字进行排序,如果第一列数字中有相同的数字,然后再按照第二列的数字进行第二次排序。如上述文本文件,进行二次排序的结果如下:

1.  3  6
2.  2  10
3.  2  4
4.  1  5

编写MovieUsersAnalyzerTest.java及SecondarySortingKey.java实现对dataforsecondarysorting.txt的文本文件内容进行二次排序。SecondarySortingKey.java是自定义Key值类。MovieUsersAnalyzerTest.java中实现了二次排序功能。

12.5.1 二次排序自定义Key值类实现(Java)

Spark中可以使用sortByKey算子对数据的Key值进行排序,本节中,我们须进行二次排序,二次排序的关键是自定义Key值,在自定义Key中实现排序的功能。这里将每一行数据的第一个数据、第二个数据组合成一个Key值,然后在自定义Key中实现二次排序。同样的思路,如果业务需求需实现三次排序、四次排序,甚至更多维度的排序,重点也是自定义Key值,将多维数据组合成为自定义Key,从而实现多维度的排序功能。

自定义Key值的Java类为SecondarySortingKey.java,在类中定义需要二次排序的组合key值:first、second,然后重写$greater、$greater$eq、$less、$less$eq、compare、compareTo方法,以及hashCode、equals方法。

compare的方法是比较自身对象this与要比较对象that的比较结果。compare方法用来确定如何将要排序的对象进行排序。假如返回的结果为x,那么

 x < 0表示this < that

 x == 0表示this == that

 x > 0表示this > that

在SecondarySortingKey.java的重载方法compareTo中,分别将两个SecondarySortingKey对象进行比较,例如,dataforsecondarysorting.txt的第一行(3 6)组合成Key的this对象,它将与第二行的(2 10)组合成Key的that对象进行比较,首先比较this和that的第一列数字,this对象的第一列数字是3,比that对象的第一列数字2大,因此相减值大于0,表示this(3 6)对象比that对象(2 10)大。如果this对象和that对象的第一列数字相等,那么再比较第二列数字的大小,进行二次排序。例如,Key(2 10)与Key(2 4)第一列数字相等的情况下,再比较第二列数字,10比4要大,因此Key(2 10)、Key(2 4)二次排序以后,Key(2 10)比Key(2 4)大。

方法$greater、$greater$eq、$less、$less$eq、compare是同样的思路。以下是Secondary SortingKey.java的重载方法compare的代码。

SecondarySortingKey.java的完整代码如下:

12.5.2 电影点评系统二次排序功能实现(Java)

我们已经实现了SecondarySortingKey.java自定义Key值类,接下来先编写一个Java测试类MovieUsersAnalyzerTest.java,验证自定义Key值二次排序的功能。验证通过以后,只需将读入的dataforsecondarysorting.txt文本文件修改为读入电影点评系统文件,重新组合成电影点评系统的Key,就可将二次排序代码移植到电影点评系统代码中。

下面先实现Java测试类MovieUsersAnalyzerTest.java,具体实现思路如下。

(1)读入dataforsecondarysorting.txt每行的数据记录。

(2)对读入的数据进行mapToPair转换,格式为Key-Value。先将每行数据按空格进行单词切分,返回一个Key-Value键值对。Key为SecondarySortKey自定义类(每行数据切分后的第0个、第1个数据),value为每行的原数据。格式化Key-Value的结果,如((2 4),"2 4")。

(3)对keyvalues使用sortByKey算子按SecondarySortKey类型的Key值进行排序。通过key值SecondarySortKey的compareTo方法排序,实现了二次排序。

(4)上一步使用sortByKey算子进行Key值二次排序完毕,这里Key值不再需要了,我们使用map函数进行转换,直接返回Key-Value键值对的Value。例如,((2 4),"2 4")map转换以后返回结果为"2 4"。

(5)打印输出二次排序后的结果。

MovieUsersAnalyzerTest.java的完整代码如下:

在IDEA中运行代码,结果如下:

Java测试类MovieUsersAnalyzerTest.java已经测试验证通过,下面将读入的dataforsecondarysorting.txt文本文件修改为读入电影点评系统文件,重新组合成电影点评系统的Key,从而实现电影点评系统的二次排序功能。在MovieUsersAnalyzer.java中读入电影评分数据ratings.dat,对电影评分数据从时间、评分数二个维度进行排序,先按时间排序,然后按评分进行二次排序。

评级文件ratings.dat的格式描述如下:

1.  UserID::MovieID::Rating::Timestamp
2. 用户ID、电影ID、评分数据、时间戳

MovieUsersAnalyzer.java在MovieUsersAnalyzerTest.java的基础上修改:①修改读入文件ratings.dat;②读入的每行数据按“::”分隔符进行分割;③将时间戳、评分数据组合成Key值放入到SecondarySortingKey类。

MovieUsersAnalyzer.java修改的地方如下:

在IDEA中运行代码,显示结果“用户ID、电影ID、评分数据、时间戳”,先按时间戳排序,然后按评分排序,打印输出结果如下:

12.5.3 二次排序自定义Key值类实现(Scala)

之前我们已使用Java实现了RDD分析大数据电影点评系统的二次排序功能,本节使用Scala语言来实现。在Movie_Users_Analyzer_RDD.scala代码文件中对电影评分数据进行二次排序,以时间戳Timestamp和评分Rating两个维度降序排列。

评级文件ratings.dat的格式描述如下。

1.  UserID::MovieID::Rating::Timestamp
2. 用户ID、电影ID、评分数据、时间戳

对电影评分数据进行二次排序完成的功能是最近时间中最新发生的点评信息。Scala相对于Java代码实现更简洁优雅。具体实现方法如下。

(1)自定义SecondarySortKey类,在SecondarySortKey类构造函数中传入first、second两个参数。SecondarySortKey类继承自Ordered排序接口及序列化器。

(2)在SecondarySortKey类中定义了compare方法,类似Java的二次排序方法,this、other的比较和之前Java类中this、that的比较类似,这里不再赘述。和Java中代码实现不同的是,Scala代码中引入了Math.ceil及Math.floor方法。

 Math.ceil方法是向上取整计算,它返回的是大于或等于函数参数,并且与之最接近的整数。

 Math.floor方法是求一个最接近它的整数,它的值小于或等于这个浮点数。

为什么这里需要调用Math.ceil、Math.floor方法呢?原因是this对象和other对象比较的结果是一个Int整数。如果计算返回值是正小数,就需要实现大于0的小数,向上取整返回1,表示第一个对象大于第二个对象;如果计算返回值是负小数,就需要实现小于0的小数向下取整返回-1,表示第一个对象小于第二个对象。实现的技巧就是通过Math.ceil、Math.floor方法来实现的。

compare的方法是比较自身对象this与要比较对象other的比较结果。compare方法用来确定如何将要排序的对象进行排序。假如计算this.second - other.second的值,那么:

 this.second - other.second = 0.5返回值取1,即取值Math.ceil(0.5)。

 this.second - other.second = -0.5返回值取-1,即取值Math. floor(-0.5)。

SecondarySortKey的代码如下:

12.5.4 电影点评系统二次排序功能实现(Scala)

接下来使用Scala实现电影点评系统的二次排序功能,实现对电影评分数据以时间戳Timestamp和评分Rating两个维度进行二次降序排列。

评级文件ratings.dat的格式描述如下:

1.  UserID::MovieID::Rating::Timestamp
2. 用户ID、电影ID、评分数据、时间戳

具体实现方法如下。

(1)读入电影点评系统文件,之前已读入ratings.dat文件生成ratingsRDD。使用map函数对ratingsRDD进行转换,将每行数据按“::”分割,取到第3个元素时间戳,第2个元素评分数据,将时间戳、评分数据组合成为Key值,每行数据为Value值,形成格式化Key-Value键值对。格式为:((时间戳、评分数据),“用户ID、电影ID、评分数据、时间戳”)。

(2)使用sortByKey算子按(时间戳、评分数据)Key值排序。

(3)sortByKey排序完毕,对于排序后的每行的元组数据((时间戳、评分数据),“用户ID、电影ID、评分数据、时间戳”),取出元组的第2个元素,即“用户ID、电影ID、评分数据、时间戳”。

(4)使用take算子取出前10个数据,打印输出结果。

Movie_Users_Analyzer_RDD.scala中电影点评系统二次排序功能实现代码如下:

在IDEA中运行代码,先按第4列时间戳排序,再按第3列评分数据排序,二次排序的结果如下: