Hudi搭建 Maven
解压文件 1 tar -zxvf /opt/software/apache-maven-3.6.1-bin.tar.gz -C /opt/module/
配置环境变量 1 2 3 4 vi /etc/profile export MAVEN_HOME=/opt/module/apache-maven-3.6.1-binexport PATH=$PATH :$MAVEN_HOME /bin
添加阿里镜像 1 2 3 4 5 6 7 vi /opt/module/maven-3.6.1/conf/settings.xml <mirror> <id >nexus-aliyun</id> <mirrorOf>central</mirrorOf> <name>Nexus aliyun</name> <url>http://maven.aliyun.com/nexus/content/groups/public</url> </mirror>
修改本地仓库 1 2 vi /opt/module/maven-3.6.1/conf/settings.xml <localRepository>/opt/software/RepMaven</localRepository>
Hudi
解压文件 1 tar -zxvf /opt/software/hudi-0.11.0.src.tgz -C /opt/module/
修改POM文件 修改依赖组件版本 1 2 3 vim /opt/software/hudi-0.12.0/pom.xml <hadoop.version>3.1.3</hadoop.version> <hive.version>3.1.2</hive.version>
修改源码兼容hadoop3.x 1 2 vim /opt/software/hudi-0.12 .0 /hudi-common/src /main /java/org/apache/hudi/common/table /log/block/HoodieParquetDataBlock.java try(FSDataOutputStream outputStream) = new FSDataOutputStream(baos)
在括号中加入null,修改后如下
1 try (FSDataOutputStream outputStream) = new FSDataOutputStream(baos,null )
位置在文件110行左右,可以直接搜索baos
进行操作
hudi-spark-bundle 修改了Hive版本为3.1.2,其携带的jetty是0.9.3,hudi本身用的0.9.4,存在依赖冲突。修改hudi-spark-bundle的pom文件,排除低版本jetty,添加hudi指定版本的jetty。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 vim /opt/software/hudi-0.12.0/packaging/hudi-spark-bundle/pom.xml <!-- Hive --> <dependency> <groupId>${hive.groupid} </groupId> <artifactId>hive-service</artifactId> <version>${hive.version} </version> <scope>${spark.bundle.hive.scope} </scope> //hive-service添加内容 <exclusions> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <groupId>org.eclipse.jetty</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.pentaho</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>${hive.groupid} </groupId> <artifactId>hive-service-rpc</artifactId> <version>${hive.version} </version> <scope>${spark.bundle.hive.scope} </scope> </dependency> <dependency> <groupId>${hive.groupid} </groupId> <artifactId>hive-jdbc</artifactId> <version>${hive.version} </version> <scope>${spark.bundle.hive.scope} </scope> //hive-jdbc添加内容 <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>javax.servlet.jsp</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.eclipse.jetty</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>${hive.groupid} </groupId> <artifactId>hive-metastore</artifactId> <version>${hive.version} </version> <scope>${spark.bundle.hive.scope} </scope> //hive-metastore添加内容 <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.datanucleus</groupId> <artifactId>datanucleus-core</artifactId> </exclusion> <exclusion> <groupId>javax.servlet.jsp</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>${hive.groupid} </groupId> <artifactId>hive-common</artifactId> <version>${hive.version} </version> <scope>${spark.bundle.hive.scope} </scope> //hive-common添加内容 <exclusions> <exclusion> <groupId>org.eclipse.jetty.orbit</groupId> <artifactId>javax.servlet</artifactId> </exclusion> <exclusion> <groupId>org.eclipse.jetty</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <!-- 增加hudi配置版本的jetty --> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> <version>${jetty.version} </version> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-util</artifactId> <version>${jetty.version} </version> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-webapp</artifactId> <version>${jetty.version} </version> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-http</artifactId> <version>${jetty.version} </version> </dependency>
hudi-utilities-bundle 修改hudi-utilities-bundle的pom文件,排除低版本jetty,添加hudi指定版本的jetty
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 vim /opt/software/hudi-0.12.0/packaging/hudi-utilities-bundle/pom.xml <!-- Hoodie --> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-common</artifactId> <version>${project.version} </version> //hudi-common添加内容 <exclusions> <exclusion> <groupId>org.eclipse.jetty</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-client-common</artifactId> <version>${project.version} </version> //hudi-client-common添加内容 <exclusions> <exclusion> <groupId>org.eclipse.jetty</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <!-- Hive --> <dependency> <groupId>${hive.groupid} </groupId> <artifactId>hive-service</artifactId> <version>${hive.version} </version> <scope>${utilities.bundle.hive.scope} </scope> //hive-service添加内容 <exclusions> <exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <groupId>org.eclipse.jetty</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.pentaho</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>${hive.groupid} </groupId> <artifactId>hive-service-rpc</artifactId> <version>${hive.version} </version> <scope>${utilities.bundle.hive.scope} </scope> </dependency> <dependency> <groupId>${hive.groupid} </groupId> <artifactId>hive-jdbc</artifactId> <version>${hive.version} </version> <scope>${utilities.bundle.hive.scope} </scope> //hive-jdbc添加内容 <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>javax.servlet.jsp</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.eclipse.jetty</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>${hive.groupid} </groupId> <artifactId>hive-metastore</artifactId> <version>${hive.version} </version> <scope>${utilities.bundle.hive.scope} </scope> //hive-metastore添加内容 <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.datanucleus</groupId> <artifactId>datanucleus-core</artifactId> </exclusion> <exclusion> <groupId>javax.servlet.jsp</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>${hive.groupid} </groupId> <artifactId>hive-common</artifactId> <version>${hive.version} </version> <scope>${utilities.bundle.hive.scope} </scope>、 //hive-common添加内容 <exclusions> <exclusion> <groupId>org.eclipse.jetty.orbit</groupId> <artifactId>javax.servlet</artifactId> </exclusion> <exclusion> <groupId>org.eclipse.jetty</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <!-- 增加hudi配置版本的jetty --> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> <version>${jetty.version} </version> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-util</artifactId> <version>${jetty.version} </version> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-webapp</artifactId> <version>${jetty.version} </version> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-http</artifactId> <version>${jetty.version} </version> </dependency>
执行编译命令
1 2 mvn clean package -DskipTests -Dspark3.1 -Dflink1.14 -Dscala-2.12 -Dhadoop.version=3.1 .3 -Pflink-bundle-shade-hive3
-DskipTests,不执行测试用例,但编译测试用例类生成相应的class文件至target/test-classes下。
-Dmaven.test.skip=true,不执行测试用例,也不编译测试用例类。
-Dspark -Dflink -Dscala 为对应组件的版本
-Pflink-bundle-shade-hive3 如果不指定此条,会在hudi编译时对flink-bundle的架包报错
需要注意的是,hudi的编译命令需要在hudi的根目录下进行执行
运行案例 移动jar包 我们编译好的jar包存储在hudi根目录下的packaging中,我们需要进入packaging文件夹找到hudi-spark3.1-bundle_2.12-0.11.0.jar架包即可。
1 cd /opt/module/hudi-0.11.0/packaging/hudi-spark-bundle/targe
随后移动架包到spark目录下的jars文件夹中
1 cp /opt/module/hudi-0.11.0/packaging/hudi-spark-bundle/target/hudi-spark3.1-bundle_2.12-0.11.0 /opt/module/spark-3.1.1-bin-hadoop3.2/jars/
进入spark-shell 1 2 3 spark-shell \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
–conf ‘spark.serializer=org.apache.spark.serializer.KryoSerializer’ 序列化
–conf ‘spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension’ 集成spark
运行案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import org.apache.hudi.QuickstartUtils._import scala.collection.JavaConversions._import org.apache.spark.sql.SaveMode._import org.apache.hudi.DataSourceReadOptions._import org.apache.hudi.DataSourceWriteOptions._import org.apache.hudi.config.HoodieWriteConfig._import org.apache.hudi.common.model.HoodieRecord val tableName = "hudi_trips_cow" val basePath = "file:///tmp/hudi_trips_cow" val dataGen = new DataGenerator val inserts = convertToStringList(dataGen.generateInserts(10 ))val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2 ))df.write.format("hudi" ). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts" ). option(RECORDKEY_FIELD_OPT_KEY, "uuid" ). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath" ). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath) val tripsSnapshotDF = spark.read.format("hudi" ).load(basePath + "/*/*/*/*" )tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot" ) spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0" ).show()
结果