Skip to content

RDD编程工程化

前面三个需求开发都是在一个文件中做所有步骤的功能,不利于工作开发。

1. 三层架构

Alt text

2. 三层架构实操

2.1 新建工程

Alt text

2.2 创建目录

创建包:com.rocket.spark.core.framework,在下面创建如图所示 Alt text

3. 具体代码

3.1 入口类

scala
package com.rocket.spark.core.framework.application

import com.rocket.spark.core.framework.common.TApplication
import com.rocket.spark.core.framework.controller.WordCountController

object WordCountApplication extends App with TApplication{

    start(){

        val wordCountController = new WordCountController()
        wordCountController.dispatch()
    }

}

3.2 controller层

scala
package com.rocket.spark.core.framework.controller

import com.rocket.spark.core.framework.common.TController
import com.rocket.spark.core.framework.service.WordCountService

class WordCountController extends TController{

    private val countService = new WordCountService()

    override def dispatch()={
        val array: Array[(String, Int)] = countService.analyzer()
        array.foreach(println)
    }
}

3.2 dao层

scala
package com.rocket.spark.core.framework.dao

import com.rocket.spark.core.framework.common.TDao

class WordCountDao extends TDao{

}

3.3 service层

scala
package com.rocket.spark.core.framework.service

import com.rocket.spark.core.framework.common.TService
import com.rocket.spark.core.framework.dao.WordCountDao
import org.apache.spark.rdd.RDD

class WordCountService extends TService{

    private val wordCountDao = new WordCountDao()

    override def analyzer(): Array[(String, Int)]={
        val fileRdd: RDD[String] = wordCountDao.readFile("datas/1.txt")
        val wordRdd: RDD[(String, Int)] = fileRdd.flatMap(line => {
            line.split(" ").map((_, 1)).toList
        })
        val resultRdd: RDD[(String, Int)] = wordRdd.reduceByKey(_ + _)
        val array: Array[(String, Int)] = resultRdd.collect()
        array
    }
}

3.5 common层

scala
package com.rocket.spark.core.framework.common

import com.rocket.spark.core.framework.utils.EnvUtil
import org.apache.spark.{SparkConf, SparkContext}

trait TApplication {

    /**
     * @param config 利用函数支持配置默认值特性
     * @param appName  利用函数支持配置默认值特性
     * @param op   使用scala中控制抽象的语法,可以将一段代码传递到函数中,并利用函数参数柯里化,将参数分为要传和不传的分开更加灵活
     */
    def start(config: String="local[*]", appName:String="Application")(op: =>Unit)={
        // 1. 环境准备
        val sparkConf: SparkConf = new SparkConf().setMaster(config).setAppName(appName)
        val sc = new SparkContext(sparkConf)
        EnvUtil.putSc(sc)

        try {
            op
        }catch {
            case ex => ex.printStackTrace()
        }

        sc.stop()
        EnvUtil.clear()

    }
}
scala
package com.rocket.spark.core.framework.common

trait TController {

    // Controller需要实现dispatch
    def dispatch={}

}
scala
package com.rocket.spark.core.framework.common

import com.rocket.spark.core.framework.utils.EnvUtil
import org.apache.spark.rdd.RDD

trait TDao {

    // 将常用的函数提取出来,作为公共代码
    def readFile(path: String) = {
        val fileRdd: RDD[String] = EnvUtil.getSc().textFile(path)
        fileRdd
    }
}
scala
package com.rocket.spark.core.framework.common

trait TService {

    // Service需要实现analyzer
    def analyzer(): Any
}

3.6 utils层

scala
package com.rocket.spark.core.framework.utils

import org.apache.spark.SparkContext

object EnvUtil {

    // 利用整个执行流程都是主线程中执行,从ThreadLocal中提取
    // 原理就是利用Thread类有属性为threadLocals进行保存数据,threadLocals是ThreadLocal.ThreadLocalMap内部类,
    // ThreadLocalMap里面就维护了一个Entry, key为ThreadLocal实例对象,value为要存的值
    private val tl = new ThreadLocal[SparkContext]

    def putSc(sc: SparkContext)={
        tl.set(sc)
    }

    def getSc():SparkContext={
        tl.get()
    }

    def clear()={
        tl.remove()
    }
}