JAVA API实现血缘关系Rest推送到DataHub V0.12.1版本

2024-03-04 0 1,129

DataHub青睐PythonAPI血缘与元数据操作

JAVA API实现血缘关系Rest推送到DataHub V0.12.1版本

虽然开源源码都有Java示例和Python示例:但是这个API示例数量简直是1:100的差距!!不知为何,项目使用Java编写,示例推送偏爱Python的官方;;;搞不懂也许就是开源官方团队写脚本的是Python一哥吧!

显然DataHub 更青睐于Python API对血缘与元数据操作

Java示例:屈指可数

JAVA API实现血缘关系Rest推送到DataHub V0.12.1版本

Python示例 就是海量丰富了

JAVA API实现血缘关系Rest推送到DataHub V0.12.1版本

目前Java示例就两个好用:

DatasetAdd.java 和 DataJobLineageAdd.java

(一)DatasetAdd.java 是设置元数据到Datahub


 private static void extractedTable() {
    String token = "";
    try (RestEmitter emitter =
        RestEmitter.create(b -> b.server("http://10.130.1.49:8080").token(token))) {
      MetadataChangeProposal dataJobIOPatch =
              new DataJobInputOutputPatchBuilder()
                      .urn(
                              UrnUtils.getUrn(
                                      "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)")) //这个是使用的JOB输入表级:中转处理任务
                      .addInputDatasetEdge(
                              DatasetUrn.createFromString(
                                      "urn:li:dataset:(urn:li:dataPlatform:mysql,JDK-Name,PROD)")) //这个是使用的JOB输入表级:入口节点
                      .addOutputDatasetEdge(
                              DatasetUrn.createFromString(
                                      "urn:li:dataset:(urn:li:dataPlatform:hive,JDK-Name,PROD)")) //这个是使用的JOB输入表级:出口节点
                      .addInputDatajobEdge(
                              DataJobUrn.createFromString(
                                      "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_123)")) // 这里定义字段列级别的血缘关系:中转处理任务
                      .addInputDatasetField(
                              UrnUtils.getUrn(
                                      "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,JDK-Name,PROD),userName)")) // 列字段的入口节点
                      .addOutputDatasetField(
                              UrnUtils.getUrn(
                                      "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mysql,JDK-Name,PROD),userName)")) // 列字段的出口节点
                      .build();
      Future<MetadataWriteResponse> response = emitter.emit(dataJobIOPatch);
      System.out.println(response.get().getResponseContent());
    } catch (Exception e) {
      e.printStackTrace();
      System.out.println("Failed to emit metadata to DataHub"+ e.getMessage());
      throw new RuntimeException(e);
    }
  }

(二)DataJobLineageAdd.java 是设置元数据带JOB任务的血缘到Datahub

 public static void main(String[] args)
      throws IOException, ExecutionException, InterruptedException {
    // Create a DatasetUrn object from a string
    DatasetUrn datasetUrn = UrnUtils.toDatasetUrn("hive", "JDK-Mysql", "PROD");
    // Create a CorpuserUrn object from a string
    CorpuserUrn userUrn = new CorpuserUrn("ingestion");
    // Create an AuditStamp object with the current time and the userUrn
    AuditStamp lastModified = new AuditStamp().setTime(1640692800000L).setActor(userUrn);

    // Create a SchemaMetadata object with the necessary parameters
    SchemaMetadata schemaMetadata =
        new SchemaMetadata()
            .setSchemaName("customer")
            .setPlatform(new DataPlatformUrn("hive"))
            .setVersion(0L)
            .setHash("")
            .setPlatformSchema(
                SchemaMetadata.PlatformSchema.create(
                    new OtherSchema().setRawSchema("__RawSchemaJDK__")))
            .setLastModified(lastModified);

    // Create a SchemaFieldArray object
    SchemaFieldArray fields = new SchemaFieldArray();

    // Create a SchemaField object with the necessary parameters
    SchemaField field1 =
        new SchemaField()
            .setFieldPath("mysqlId")
            .setType(
                new SchemaFieldDataType()
                    .setType(SchemaFieldDataType.Type.create(new StringType())))
            .setNativeDataType("VARCHAR(50)")
            .setDescription(
                "Java用户mysqlId名称VARCHAR")
            .setLastModified(lastModified);
    fields.add(field1);

    SchemaField field2 =
        new SchemaField()
            .setFieldPath("PassWord")
            .setType(
                new SchemaFieldDataType()
                    .setType(SchemaFieldDataType.Type.create(new StringType())))
            .setNativeDataType("VARCHAR(100)")
            .setDescription("Java用户密码VARCHAR")
            .setLastModified(lastModified);
    fields.add(field2);

    SchemaField field3 =
        new SchemaField()
            .setFieldPath("CreateTime")
            .setType(
                new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new DateType())))
            .setNativeDataType("Date")
            .setDescription("Java用户创建时间Date")
            .setLastModified(lastModified);
    fields.add(field3);

    // Set the fields of the SchemaMetadata object to the SchemaFieldArray
    schemaMetadata.setFields(fields);

    // Create a MetadataChangeProposalWrapper object with the necessary parameters
    MetadataChangeProposalWrapper mcpw =
        MetadataChangeProposalWrapper.builder()
            .entityType("dataset")
            .entityUrn(datasetUrn)
            .upsert()
            .aspect(schemaMetadata)
            .build();

    // Create a token
    String token = "";
    // Create a RestEmitter object with the necessary parameters
    RestEmitter emitter = RestEmitter.create(b -> b.server("http://10.130.1.49:8080").token(token));
    // Emit the MetadataChangeProposalWrapper object
    Future<MetadataWriteResponse> response = emitter.emit(mcpw, null);
    // Print the response content
    System.out.println(response.get().getResponseContent());
    emitter.close();
  }

我们大多数时候不是需要带JOb的血缘关系

例如: 直接是表与表之间有关系

JAVA API实现血缘关系Rest推送到DataHub V0.12.1版本

python脚本这里不赘述:太多示例了。重点是Java这边怎么实现这个东西

参考DataJobLineageAdd示例:他这里核心分析

(1.1) 就是把血缘关系提交到Datahub

代码====>

Future<MetadataWriteResponse> response = emitter.emit(dataJobIOPatch);
System.out.println(response.get().getResponseContent());

分析====>

emitter.emit(?) 这个方法就是提交血缘关系;
里面填充好的就是血缘关系数据吧:示例是dataJobIOPatch 就是携带JOB的血缘关系数据;
 因为他初始化变量的时候就是DataJobInputOutputPatchBuilder构建的,见名知意就是JOb相关的

 MetadataChangeProposal dataJobIOPatch =
              new DataJobInputOutputPatchBuilder()......

所以我们是否是MetadataChangeProposal的实现替换为别的方式:找找源码

类比思想:看看同样的builder实现的地方有别的实现没有

JAVA API实现血缘关系Rest推送到DataHub V0.12.1版本

挑出了看着很像的实现:猜一下肯定是和JOB没关系了,而且是直接操作元数据的关系的
DatasetPropertiesPatchBuilder
EditableSchemaMetadataPatchBuilder
UpstreamLineagePatchBuilder

SO 简单改造一下 取名为:DataSetLineageAdd

@Slf4j
class DataSetLineageAdd {

  private DataSetLineageAdd() {}

  /**
   * Adds lineage to an existing DataJob without affecting any lineage
   *
   * @param args
   * @throws IOException
   * @throws ExecutionException
   * @throws InterruptedException
   */
  public static void main(String[] args)
      throws IOException, ExecutionException, InterruptedException {
    extractedTable();
  }

  private static void extractedRow() {
   // 没有java版本。。。。
  }
  private static void extractedTable() {
    String token = "";
    try (RestEmitter emitter =
        RestEmitter.create(b -> b.server("http://10.130.1.49:8080").token(token))) {
      MetadataChangeProposal mcp =
              new UpstreamLineagePatchBuilder().
                      urn(UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:mysql,ctmop.assets_info,PROD)"))
                      .addUpstream(DatasetUrn.createFromString(
                                      "urn:li:dataset:(urn:li:dataPlatform:mysql,ctmop.operation_fee_info,PROD)"), DatasetLineageType.TRANSFORMED)
                      .build();
      Future<MetadataWriteResponse> response = emitter.emit(mcp);
      System.out.println(response.get().getResponseContent());
    } catch (Exception e) {
      e.printStackTrace();
      System.out.println("Failed to emit metadata to DataHub"+ e.getMessage());
      throw new RuntimeException(e);
    }
  }
}

表级血缘用JAVA代码就实现了;这是一个简单的Demo;更深入的拓展需要自行挖掘!!!

JAVA API实现血缘关系Rest推送到DataHub V0.12.1版本

有人说表级血缘太low了,能不能做到JAVA的字段级血缘关系呢。。。。当然没问题

看我示例用的这个:UpstreamLineagePatchBuilder 他意思没有指定表级还是字段级;API 方法 addUpstream 和 urn都是泛用型,理论上都OK

分析:
表级的元数据: urn:li:dataset:(urn:li:dataPlatform:mysql,ctmop.assets_info,PROD) 这个样子
列级的元数据: urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mysql,JDK-Name,PROD),userName) 这个样子

发现规律了:表级外面包一层urn:li:schemaField:XXXX,字段名 那不就是列字段了,。。。。。浅谈捯饬结束!!!

有问题还望大家指正:!!!

作者:隔壁老郭

资源下载此资源下载价格为1小猪币,终身VIP免费,请先
由于本站资源来源于互联网,以研究交流为目的,所有仅供大家参考、学习,不存在任何商业目的与商业用途,如资源存在BUG以及其他任何问题,请自行解决,本站不提供技术服务! 由于资源为虚拟可复制性,下载后不予退积分和退款,谢谢您的支持!如遇到失效或错误的下载链接请联系客服QQ:442469558

:本文采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可, 转载请附上原文出处链接。
1、本站提供的源码不保证资源的完整性以及安全性,不附带任何技术服务!
2、本站提供的模板、软件工具等其他资源,均不包含技术服务,请大家谅解!
3、本站提供的资源仅供下载者参考学习,请勿用于任何商业用途,请24小时内删除!
4、如需商用,请购买正版,由于未及时购买正版发生的侵权行为,与本站无关。
5、本站部分资源存放于百度网盘或其他网盘中,请提前注册好百度网盘账号,下载安装百度网盘客户端或其他网盘客户端进行下载;
6、本站部分资源文件是经压缩后的,请下载后安装解压软件,推荐使用WinRAR和7-Zip解压软件。
7、如果本站提供的资源侵犯到了您的权益,请邮件联系: 442469558@qq.com 进行处理!

猪小侠源码-最新源码下载平台 Java教程 JAVA API实现血缘关系Rest推送到DataHub V0.12.1版本 http://www.20zxx.cn/808744/xuexijiaocheng/javajc.html

猪小侠源码,优质资源分享网

常见问题
  • 本站所有资源版权均属于原作者所有,均只能用于参考学习,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担
查看详情
  • 最常见的情况是下载不完整: 可对比下载完压缩包的与网盘上的容量,建议提前注册好百度网盘账号,使用百度网盘客户端下载
查看详情

相关文章

官方客服团队

为您解决烦忧 - 24小时在线 专业服务