Google Data Fusion构建数据ETL任务

2023-09-13 17:42:38

Google云平台提供了一个Data Fusion的产品,是基于开源的CDAP做的一个图形化的编辑工具,可以很方便的来完成数据处理的任务,而无需编写代码。假设我们现在要构建一个ETL的任务,从Kafka中消费一些数据,经过处理之后把数据存放到Bigquery中。

首先我们要准备一些测试数据发送到Kafka。这里我是在GKE的环境中起了一个Kafka的pod,然后往test topic发送了一些简单的JSON格式的消息。

创建Data Fusion Instance

在GCP的console页面中打开Data Fusion,选择Create an instance,在配置页面中,版本我没有选择最新的6.9.2版本,因为发现这个版本在解析JSON格式会有问题,我选择的是6.8.3。然后在Advanced Option里面,我选择了Enable Private IP,因为我的Kafka是没有暴露对外的公网IP,因此Data Fusion Instance也只能用Private IP来和Kafka进行通讯。在Associating Network里面,选择需要在哪个VPC里面来分配这个Private IP。之后点击Create就可以创建一个Instance了。

VPC Network Peering

Instance的创建需要等待一段时间,等完成之后,点击Instance的名字,我们可以看到相关的信息,拷贝Instance的Tenant project ID。然后去到VPC Network里面,建立一个VPC network peering。因为Data Fusion需要把任务安排在Dataproc集群上运行,这个集群是运行在单独的网络中,如果要和我们的Kafka通讯,需要和Kafka所在的VPC网络建立一个peering。选择create peering connection,在里面的Your VPC network里面,输入Kafka所在的VPC network,在peered network里面选择in another project,输入刚才拷贝的Tenant project ID,勾选Exchange IPv4 custom routes里面的Export custom routes,然后点击create进行创建。

防火墙规则设置

因为我的VPC network里面的防火墙规则,出于安全考虑,把default-allow-internal这条规则给删掉了。但是Dataproc的集群里面的VM需要互相之间能进行通讯,因此我们需要加上一条规则。在VPC network的Firewall rule里面,添加一条规则,其中Direction选择ingress,Sources和Targets里面我们都设置为Tag,tag的名称是Dataproc,这个tag将在稍后Data Fusion的compute profile里面设置。

设置ETL Pipeline

在Data Fusion的Instances列表里面,选择View instance打开刚才我们创建的Instance,然后点击Wrangler,在里面选择Add connection,然后选择Kafka Connection,在Kafka Brokers里面输入地址,例如10.0.0.100:9094,然后点击Test connection。如果之前的网络设置都完成的话,应该是能正确连接的。之后Wrangler就会打开这个Kafka连接,然后可以看到这个Kafka里面的所有的topic。点击我们要测试的那个topic,就可以看到里面已有的数据,然后我们可以点击左上角Message的小箭头,在下拉菜单中选择Parse as JSON,这样子就可以直接把JSON里面的字段解析为对应的字段了,在右边的界面列出了解析出的对应字段,这里我需要修改一下字段的名称,使得字段名和我之后创建Bigquery的表里面的字段名对应。

数据解析没有问题之后,可以在直接点击右上角的create pipeline来创建一个任务了。

在新打开的Pipeline的编辑窗口中,我们可以看到目前有两个步骤存在,一个是Kafka connection,另一个是Wrangler。我们需要再增加一个步骤,把Wrangler解析的数据保存到Bigquery里面。在左边菜单的Sink里面,点击Bigquery,然后把Wrangler框的箭头拖动连接到这个新加的Bigquery。点击Bigquery的properties,选择Use connection,然后选择之前已经创建好的bigquery的数据集和数据表即可。主要Wrangler输出的字段名需要和数据表的字段名匹配。

Pipeline设置好之后,我们就可以选择Deploy来部署了。

设置Compute Profile

点击Data Fusion右上角的System admin,然后在Configuration里面的System compute profiles里面,我们可以新建一个profile。

在Profile里面我们可以设置要部署的Dataproc集群的机器的配置。在General setting的subnet里面,我需要设置正确的subnet,因为在我的VPC网络里面有多个subnet,不同的subnet的策略是不同的。我需要设置kafka所在的那个subnet。在Cluster metadata的network tag里面,输入我们在配置防火墙规则是设定的tag Dataproc。这样就可以让我们的cluster能应用到那条规则。

把新建的profile设置为default之后,我们就可以运行Pipeline了。点击界面上的Run即可。打开Log,我们可以看到整个Pipeline运行的情况。整个Pipeline成功运行完毕之后,我们可以打开Bigquery的对应的数据表,可以看到数据能成功的从kafka消费之后写入Bigquery的表中。

更多推荐

穿越两大空间的调用栈

人是有经历的,软件也如此。简历记录着一个人的经历,而调用栈(callstack)则记录着软件的经历。看一个人的简历可以快速了解一个人。观察调用栈,则可以快速理解软件。因为此,我非常喜欢看软件的调用栈。每当看到一个漂亮的调用栈,我常常如获至宝,端详许久。因为对调试技术的热爱,这些年,我花了很多时间在调试器上。特别是开发了

基于movie lens-100k数据集的协同过滤算法实现

基于movielens-100k数据集的协同过滤算法实现数据集处理基于用户的协同过滤算法的实现基于物品的协同过滤算法的实现数据集处理importpandasaspdu_data=pd.read_csv('D:/PyCharmWorkSpace/ml-100k/ml-100k/u.data')u_genre=pd.rea

使用qt完善对话框功能

1、完善登录框点击登录按钮后,判断账号(admin)和密码(123456)是否一致,如果匹配失败,则弹出错误对话框,文本内容“账号密码不匹配,是否重新登录”,给定两个按钮ok和cancel,点击ok后,会清除密码框中的内容,继续进行登录;如果点击cancel按钮,则关闭界面。如果账号和密码匹配,则弹出信息对话框,给出提

zookeeper + kafka

Zookeeper概述Zookeeper是一个开源的分布式服务管理框架。存储业务服务节点元数据及状态信息,并负责通知再ZooKeeper上注册的服务几点状态给客户端Zookeeper工作机制Zookeeper从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观

Spring 框架的 MethodInterceptor 简介

org.springframework.cglib.proxy.MethodInterceptor是CGLIB库(CodeGenerationLibrary)中的一个接口,用于拦截方法的调用。CGLIB是一个用于生成Java字节码的代码生成库,它通常与SpringAOP一起使用,用于创建动态代理。MethodInter

IaaS,PaaS,SaaS 的区别

越来越多的软件,开始采用云服务。云服务只是一个统称,可以分成三大类。IaaS:基础设施服务,Infrastructure-as-a-servicePaaS:平台服务,Platform-as-a-serviceSaaS:软件服务,Software-as-a-serviceSaaS是软件的开发、管理、部署都交给第三方,不需

Java反序列化和PHP反序列化的区别

文章目录PHP反序列化漏洞什么是反序列化漏洞?修改序列化后的数据,目的是什么?Java反序列化漏洞那么漏洞点在哪里?漏洞成因什么是反序列化漏洞?反序列化存在的意义是为了数据传输,类是无法直接进行传输的。通过序列化后转换为字符串格式或者JSON格式进行传输。序列化与反序列化seriallization序列化:将对象转化为

Java多线程篇(4)——wait/notify和park/unPark

文章目录Object-wait/notifyobject.wait()object.notify()LockSupport-park/unparkLockSupport.park()LockSupport.unPark()Object-wait/notifyobject.wait()ObjectSynchronizer

宏任务,微任务,事件循环event loop与promise、setTimeout、async、nextTick【超详细示例讲解】

目录js单线程宏任务:在主线程上排队执行的任务,顺序执行宏任务macrotask:setTimeout,setInterval定时事件,Ajax,DOM事件,script脚本的执行、I/O操作、UI渲染等。微任务:不进入主线程、而进入"微任务列表"的任务微任务microtask(异步):Promise、async/aw

聚合支付备案对聚合支付系统及安全有何要求?

聚合支付备案,依据《收单外包服务机构备案管理办法》《收单外包服务机构自律规范》《关于加强收单外包服务市场规范管理的意见》等政策,对聚合支付系统及安全要求如下:对聚合支付机构要求一是聚合支付机构应当具备必要的、独立的系统、设施和技术,提供安全、稳定且可持续的聚合支付技术服务。其中,独立的系统是指聚合支付系统逻辑独立并与其

芯科科技第二代平台的所有蓝牙片上系统均可支持蓝牙技术联盟的新功能和新标准

中国,北京-2023年9月21日–致力于以安全、智能无线连接技术,建立更互联世界的全球领导厂商SiliconLabs(亦称“芯科科技”,NASDAQ:SLAB),今日宣布其支持蓝牙技术联盟(BluetoothSIG)针对蓝牙网状网络(BluetoothMesh)实现的新功能增强,以及他们新的网络照明控制(NLC)标准,

热文推荐