private[worker] defprepareAndRunDriver(): Int = { // 下载Driver jar 到Worker本地,即本示例中的 /path/to/examples.jar val driverDir = createWorkingDirectory() val localJarFilename = downloadUserJar(driverDir) // 替换参数中的workerUrl和localJarFilename defsubstituteVariables(argument: String): String = argument match { case"{{WORKER_URL}}" => workerUrl case"{{USER_JAR}}" => localJarFilename case other => other }
if (args.isStandaloneCluster && args.useRest) { try { printStream.println("Running Spark using the REST application submission protocol.") doRunMain() } catch { //忽略 } // In all other modes, just run the main class as prepared } else { doRunMain() }
if [ -f "${user_conf_dir}/spark-env.sh" ]; then # Promote all variable declarations to environment (exported) variables set -a . "${user_conf_dir}/spark-env.sh" set +a fi fi
if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; then echo -e "Presence of build for both scala versions(SCALA 2.10 and SCALA 2.11) detected." 1>&2 echo -e 'Either clean one of them or, export SPARK_SCALA_VERSION=2.11 in spark-env.sh.' 1>&2 exit 1 fi
if [ -d "$ASSEMBLY_DIR2" ]; then export SPARK_SCALA_VERSION="2.11" else export SPARK_SCALA_VERSION="2.10" fi fi
# Find the java binary if [ -n "${JAVA_HOME}" ]; then RUNNER="${JAVA_HOME}/bin/java" else if [ `command -v java` ]; then RUNNER="java" else echo"JAVA_HOME is not set" >&2 exit 1 fi fi
# Find assembly jar # 定位spark-assembly-1.5.0-hadoop2.4.0.jar文件(以spark1.5.0为例) #这意味着任务提交时无需将该JAR文件打包 SPARK_ASSEMBLY_JAR= if [ -f "${SPARK_HOME}/RELEASE" ]; then ASSEMBLY_DIR="${SPARK_HOME}/lib" else ASSEMBLY_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION" fi
GREP_OPTIONS= num_jars="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" | wc -l)" # 判断是 spark 是否已经编译 if [ "$num_jars" -eq "0" -a -z "$SPARK_ASSEMBLY_JAR" -a "$SPARK_PREPEND_CLASSES" != "1" ]; then echo"Failed to find Spark assembly in $ASSEMBLY_DIR." 1>&2 echo"You need to build Spark before running this program." 1>&2 exit 1 fi # Spark assembly jars只能存在一个 if [ -d "$ASSEMBLY_DIR" ]; then ASSEMBLY_JARS="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" || true)" if [ "$num_jars" -gt "1" ]; then echo"Found multiple Spark assembly jars in $ASSEMBLY_DIR:" 1>&2 echo"$ASSEMBLY_JARS" 1>&2 echo"Please remove all but one jar." 1>&2 exit 1 fi fi # 获得ASSEMBLY_JAR路径 SPARK_ASSEMBLY_JAR="${ASSEMBLY_DIR}/${ASSEMBLY_JARS}"
LAUNCH_CLASSPATH="$SPARK_ASSEMBLY_JAR"
# Add the launcher build dir to the classpath if requested. if [ -n "$SPARK_PREPEND_CLASSES" ]; then LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH" fi
export _SPARK_ASSEMBLY="$SPARK_ASSEMBLY_JAR"
# For tests if [[ -n "$SPARK_TESTING" ]]; then unset YARN_CONF_DIR unset HADOOP_CONF_DIR fi
# The launcher library will print arguments separated by a NULL character, to allow arguments with # characters that would be otherwise interpreted by the shell. Read that in a while loop, populating # an array that will be used to exec the final command. # # The exit code of the launcher is appended to the output, so the parent shell removes it from the # command array and checks the value to see if the launcher succeeded. build_command() { "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@" printf"%d\0" $? }
# Turn off posix mode since it does not allow process substitution set +o posix CMD=() # 创建数组 while IFS= read -d '' -r ARG; do# 把build_commands输出结果,循环加到数组CMD中 CMD+=("$ARG") done < <(build_command "$@")
# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes # the code that parses the output of the launcher to get confused. In those cases, check if the # exit code is an integer, and if it's not, handle it as a special error case. # 某些JVM失败会导致错误被打印到stdout(而不是stderr),这会导致解析启动程序输出的代码变得混乱。 # 在这些情况下,检查退出代码是否为整数,如果不是,将其作为特殊的错误处理。 if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then echo"${CMD[@]}" | head -n-1 1>&2 exit 1 fi
# 如果返回值不为0,退出,返回返回值 if [ $LAUNCHER_EXIT_CODE != 0 ]; then exit$LAUNCHER_EXIT_CODE fi
defloadDataSet(): list = [[1, 3, 4], [2, 3, 5], [1, 2, 3, 5], [2, 5]] return list
根据数据集构建集合C1,该集合是大小为1的所有候选集的集合。
1 2 3 4 5 6 7 8
defcreateC1(dataSet): C1 = [] #C1是大小为1的所有候选项集的集合 for transaction in dataSet: for item in transaction: ifnot [item] in C1: C1.append([item]) C1.sort() return map(frozenset, C1)#use frozen set so we can use it as a key in a dict
根据构建出来的频繁项集,选出满足我们需要的大于我们给定的支持度的项集
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#D表示数据集,CK表示候选项集,minSupport表示最小的支持度,自己设定 defscanD(D, Ck, minSupport): ssCnt = {} for tid in D: # 统计 候选项出现次数 for can in Ck: if can.issubset(tid): ifnot ssCnt.has_key(can): ssCnt[can]=1 else: ssCnt[can] += 1 numItems = float(len(D)) retList = [] #存储满足最小支持度要求的项集 supportData = {} #每个项集的支持度字典 for key in ssCnt: #计算所有项集的支持度 support = ssCnt[key]/numItems if support >= minSupport: retList.insert(0,key) supportData[key] = support return retList, supportData
3.1 频繁项集
关于频繁项集的产生,我们单独的抽取出来 首先需要一个生成合并项集的函数,将两个子集合并的函数
1 2 3 4 5 6 7 8 9 10 11
#LK是频繁项集列表,K表示接下来合并的项集中的单个想的个数{1,2,3}表示k=3 defaprioriGen(Lk, k):#creates Ck retList = [] lenLk = len(Lk) for i in range(lenLk): for j in range(i+1, lenLk): L1 = list(Lk[i])[:k-2]; L2 = list(Lk[j])[:k-2] #前k-2个项相同时,将两个集合合并 L1.sort(); L2.sort() if L1==L2: #if first k-2 elements are equal retList.append(Lk[i] | Lk[j]) #set union return retList
接着定义生成频繁项集的函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14
#只需要输入数据集和支持度即可 defapriori(dataSet, minSupport = 0.5): C1 = createC1(dataSet) D = map(set, dataSet) L1, supportData = scanD(D, C1, minSupport) L = [L1] k = 2 while (len(L[k-2]) > 0): Ck = aprioriGen(L[k-2], k) Lk, supK = scanD(D, Ck, minSupport)#scan DB to get Lk supportData.update(supK) L.append(Lk) k += 1 return L, supportData#返回频繁项集和每个项集的支持度值
#输入的参数分别为:频繁项集、支持度数据字典、自定义的最小支持度,返回的是可信度规则列表 defgenerateRules(L, supportData, minConf=0.7):#支持度是通过scanD得到的字典 bigRuleList = [] for i in range(1, len(L)):#只去频繁项集中元素个数大于2的子集,如{1,2}{1,2,3},不取{2}{3},etc... for freqSet in L[i]: H1 = [frozenset([item]) for item in freqSet] if (i > 1): rulesFromConseq(freqSet, H1, supportData, bigRuleList, minConf) else: calcConf(freqSet, H1, supportData, bigRuleList, minConf) return bigRuleList