Flink读取本地文件

1.nc -l 9090多线程

public static void main(String[] args) throws Exception {
    try {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000); // 很是关键,必定要设置启动检查点!!
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setGlobalJobParameters(loadConfig("config.properties"));

        DataStream<String> text = env.socketTextStream("localhost", 9090, "\n");

        DataStream stream = text.flatMap(new LineMapper());

        //stream.addSink(new SinkTest());

        // 将结果打印到控制台,注意这里使用的是单线程打印,而非多线程
        //env.setParallelism(1);

        env.execute("WordCount from Kafka data");
    } catch (Exception e) {
        System.out.println(e);
    }

}

2.读取文本文件app

public static void main(String[] args) throws Exception {
    try {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000); // 很是关键,必定要设置启动检查点!!
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setGlobalJobParameters(loadConfig("config.properties"));

        DataStream<String> text = env.readTextFile("/Users/duanxiaoqiu/test.txt");
        //DataStream<String> text = env.socketTextStream("localhost", 9090, "\n");

        DataStream stream = text.flatMap(new LineMapper());

        //stream.addSink(new SinkTest());

        // 将结果打印到控制台,注意这里使用的是单线程打印,而非多线程
        //env.setParallelism(1);

        env.execute("WordCount from Kafka data");
    } catch (Exception e) {
        System.out.println(e);
    }

}

3.读取其余类型文件socket