1.nc -l 9090多线程
public static void main(String[] args) throws Exception { try { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); // 很是关键,必定要设置启动检查点!! env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setGlobalJobParameters(loadConfig("config.properties")); DataStream<String> text = env.socketTextStream("localhost", 9090, "\n"); DataStream stream = text.flatMap(new LineMapper()); //stream.addSink(new SinkTest()); // 将结果打印到控制台,注意这里使用的是单线程打印,而非多线程 //env.setParallelism(1); env.execute("WordCount from Kafka data"); } catch (Exception e) { System.out.println(e); } }
2.读取文本文件app
public static void main(String[] args) throws Exception { try { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); // 很是关键,必定要设置启动检查点!! env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setGlobalJobParameters(loadConfig("config.properties")); DataStream<String> text = env.readTextFile("/Users/duanxiaoqiu/test.txt"); //DataStream<String> text = env.socketTextStream("localhost", 9090, "\n"); DataStream stream = text.flatMap(new LineMapper()); //stream.addSink(new SinkTest()); // 将结果打印到控制台,注意这里使用的是单线程打印,而非多线程 //env.setParallelism(1); env.execute("WordCount from Kafka data"); } catch (Exception e) { System.out.println(e); } }
3.读取其余类型文件socket