罗列一下bug,备注一下,后续不断完善java
tableEnv.registerTable("result_agg", talbe);
如上,若是你写为mysql
tableEnv.registerTable("result", talbe);
那么会报如下错误web
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL parse failed. Encountered "from result" at line 1, column 13. Was expecting one of: <EOF> "ORDER" ... "LIMIT" ... "OFFSET" ... "FETCH" ... "FROM" <IDENTIFIER> ... "FROM" <QUOTED_IDENTIFIER> ... "FROM" <BACK_QUOTED_IDENTIFIER> ... "FROM" <BRACKET_QUOTED_IDENTIFIER> ... "FROM" <UNICODE_QUOTED_IDENTIFIER> ... "FROM" "LATERAL" ... "FROM" "(" ... "FROM" "UNNEST" ... "FROM" "TABLE" ... "," ... "AS" ... <IDENTIFIER> ... <QUOTED_IDENTIFIER> ... <BACK_QUOTED_IDENTIFIER> ... <BRACKET_QUOTED_IDENTIFIER> ... <UNICODE_QUOTED_IDENTIFIER> ...
Caused by: java.lang.ClassCastException: java.lang.Boolean cannot be cast to java.lang.String at org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:33) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:161)
转的方式很简单好比sql
select project_fid, cast(project_info_type as CHAR) as type from table
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Error while applying rule FlinkLogicalJoinConverter, args [rel#56:LogicalJoin.NONE(left=rel#52:Subset#0.NONE,right=rel#55:Subset#2.NONE,condition==($0, $2),joinType=inner)]
这个bug目前尚未修复,链接以下 https://issues.apache.org/jira/browse/FLINK-11433数据库
只能想办法绕过去,把map类型的数据,变成字符串,方法是自定义函数apache
public class MapToString extends ScalarFunction { public String eval(Map<String, Integer> map) { if(map==null || map.size()==0) { return ""; } StringBuffer sb=new StringBuffer(); for(Map.Entry<String, Integer> entity : map.entrySet()) { sb.append(entity.getKey()+","); } String result=sb.toString(); return result.substring(0, result.length()-1); } }
调用的时候使用api
select id, mapToString(collect(type)) as type from table group by id
固然你还须要注册一下mybatis
tableEnv.registerFunction("mapToString", new MapToString());
最近总遇到类型转化错误的提示,目前发现了两个, 作个记录app
a 若是是tiny(1) 会自动转为 boolean, 除了上面的解决方案,更优雅的是修改mysql 的链接,加上参数 tinyInt1isBit=false, 注意大小写tcp
b 有时候mysql数据库id字段明明是int,但flink却认定为long。 貌似之前mybatis也有此问题(https://blog.csdn.net/ahwsk/article/details/81975117)。
后来我又认真的看了一下表设计(别人的表)发现 勾选了“无符号” 这个选项,当我去掉这个选项,再次运行,竟然不报错了,看来是无符号,让flink转化错误的,无符号比有符号在范围上多了一位, 多出的这一位,有可能已经超过了java中int 的范围(java中int 都是有符号的),因此自动转为long型了。
虽然fatjar已经有对应的类了,可是依然报错,最后的解决办法是在flink的lib目录中再次加入相关的类,问题解决。
这个错误须要在flink-conf.yaml 加入 classloader.resolve-order: parent-first
Caused by: java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@flink88:15265/user/taskmanager_0#66653408]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation".
出现此错误,在flink的配置中增长
akka.ask.timeout: 120s web.timeout: 120000
8.Exception while invoking ApplicationClientProtocolPBClientImpl.forceKillApplication over null. Retrying after sleeping for 30000ms
发生此错误是提交任务时出错,使用 yarn logs -applicationId application_1565600987111 查看错误,找到缘由,我遇到的缘由是: akka.watch.heartbeat.pause 值小于 akka.watch.heartbeat.interval。修改后错误消失
或者kill 掉 CliFrontend 的进程
Exception while invoking ApplicationClientProtocolPBClientImpl.forceKillApplication over null. Retrying after sleeping for 30000ms. java.io.IOException: The client is stopped at org.apache.hadoop.ipc.Client.getConnection(Client.java:1519) at org.apache.hadoop.ipc.Client.call(Client.java:1381) at org.apache.hadoop.ipc.Client.call(Client.java:1345) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
出现此异常仍是内存的问题,检查一下内存是否足够,必须是free的不能是available, 若是发现后者很高, 请执行 如下两条命令释放内存
sync echo 3 > /proc/sys/vm/drop_caches
10
Caused by: java.net.BindException: Could not start rest endpoint on any port in port range 8081
at org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:219)
at org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:161)
... 9 more
此错误是说端口被占用。查看源代码:
Iterator<Integer> portsIterator; try { portsIterator = NetUtils.getPortRangeFromString(restBindPortRange); } catch (IllegalConfigurationException e) { throw e; } catch (Exception e) { throw new IllegalArgumentException("Invalid port range definition: " + restBindPortRange); }
对应的配置是 flink-conf.yaml中的rest.bind-port。rest.bind-port不设置,则Rest Server默认绑定到rest.port端口(8081)。rest.bind-port能够设置成列表格式如50100,50101,也可设置成范围格式如50100-50200。推荐范围格式,避免端口冲突。