阿东的笔记_  工具
## Docker搭建flink+kafka(二)快捷脚本批量执行SQL作业 - 本人使用阿里云实时计算Flink全托管版,使用SQL编写作业,不需要复杂的开发环境,,为了方便快捷执行,准备了如下环境快速开发SQL作业 - IDEAJ中创建项目 - pom.xml - [![install](https://static.adong.wiki/static/images/md/2021010201.png)](https://static.adong.wiki/static/images/md/2021010201.png) ``` xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>adong.wiki</groupId> <artifactId>flink-sqlsubmit</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <properties> <flink.version>1.11.2</flink.version> <scala.binary.version>2.11</scala.binary.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <rocketmq.version>4.8.0</rocketmq.version> <commons-lang.version>2.5</commons-lang.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> </dependencies> </project> ``` - 准备好Java脚本后。IDEAJ中打包jar包 - 源码参考 [https://gitee.com/adongge/flink](https://gitee.com/adongge/flink) - 编写sql-submit.sh ``` Bash #!/bin/bash f0="/home/dev/sql/source.sql" f1="/home/dev/sql/sink.sql" f2="/home/dev/sql/job.sql" sql_file="$f0,$f1,$f2" AD_JAR=/home/dev/flink-sqlsubmit-1.0-SNAPSHOT.jar /home/flink/bin/flink run -p 1 -c local.sqlsubmit.SqlSubmit $AD_JAR -f $sql_file # docker exec -it adflink /bin/bash -c '/home/dev/sql-submit.sh' # docker exec -it adflink /bin/bash -c '/home/flink/bin/flink list' ``` - 在sql目录中编写sql作业内容 ``` SQL -- 源表 /home/dev/sql/source.sql create table test_source ( device_number STRING ) with ( 'connector' = 'kafka', 'topic' = 'testtopic', 'properties.bootstrap.servers' = 'adkafka.com:9092', 'properties.group.id' = 'testtopic_source', 'format' = 'csv', 'csv.field-delimiter' = '#', 'csv.ignore-parse-errors' = 'true', 'scan.startup.mode' = 'earliest-offset' ); -- 结果表 /home/dev/sql/sink.sql create table test_sink ( device_number STRING ) with ( 'scan.fetch-size' = '1', 'connector' = 'jdbc', 'password' = '123456', 'username' = 'root', 'table-name' = 'test', 'url' = 'jdbc:mysql://172.10.0.1:3308/kpr_db?rewriteBatchedStatements=true&serverTimezone=GMT%2B8' ); -- 作业 /home/dev/sql/job.sql INSERT INTO test_sink(device_number) SELECT device_number FROM test_source ``` - 执行 docker exec -it adflink /bin/bash -c '/home/dev/sql-submit.sh' - [![install](https://static.adong.wiki/static/images/md/2021010202.png)](https://static.adong.wiki/static/images/md/2021010202.png) - 在 http://localhost:8081 中查看作业提交情况 - [![install](https://static.adong.wiki/static/images/md/2021010203.png)](https://static.adong.wiki/static/images/md/2021010203.png) - 本地开发测试搞定,就可以专注SQL作业,无需编写java代码,方便快捷本地测试开发
adddge@sohu.com  | 桂ICP备2022009838号-2