Java网络编程(3):使用 UDP 探测局域网内特定类型的机器

记得之前咱们使用相似“快牙”这些文件分享工具的时候,一开始就是先在 手机A 上建立一个“房间”,而后链接上 手机A WiFi 热点的其余手机(即这些手机处于一个局域网内)就能够发现到这个房间并加入到这个房间里面,而后就能够互相分享文件了。那没有创建链接的状况下,“发现房间”这个功能是怎么实现的呢?
首先,既然 手机A 处于局域网中,那么根据 手机A 当前在局域网的 IP 地址和子网掩码,就能够得到这个局域网内全部机器的 IP 地址 的范围。若是在没有创建链接的状况下,手机A 就能够给这个范围内的每一个 IP 地址都发送一个消息 —— 那么若是某个 IP 地址的机器(设为 手机B)会对这个消息作出回应,便说明 手机B手机A 的“本身人”,那么 手机A 即可以告诉 手机B 它在当前的局域网建了一个“房间”,房间号是个啥,而后 手机B 能够选择是否加入到这个“房间”。java

  1. Java网络编程(1)中,咱们已经知道可使用 NetworkInterface 来得到机器在局域网内 IP 地址;
  2. Java网络编程(2)中,咱们知道使用 UDP,即可以在不创建链接的状况下,直接向某个 IP 地址发送消息;
  3. 若是每次都是遍历这个局域网内全部的 IP 地址,并使用 UDP 向每一个 IP 发送消息,那样就有点麻烦了。事实上,咱们可使用广播。每一个局域网都有一个对应的广播地址,向广播地址发送的数据包经过网关设备(好比路由器)时,网关设备会向局域网的每台设备发送一份该数据包的副本。经过 IP 和子网掩码计算广播地址的方法简单的形容就是 (IP地址)|(~子网掩码)—— 将子网掩码按位取反再和IP地址进行或运算,好比当前机器在局域网内的地址为 192.168.1.3,子网掩码为 255.255.255.0(取反后为 0.0.0.255),那么广播地址为 192.168.1.255。广播也是在不创建链接的状况下就发送数据,因此广播不能经过 TCP 实现,只能是 UDP。在 Java 中,经过 UDP 进行广播和单播(即只向一个 IP 地址发送数据包)的程序几乎没有区别,只是地址由一个特定的单播地址(如 192.168.1.3)变为了其对应的广播地址(192.168.1.255)。

如今让咱们来实现下面的功能:
一、Broadcaster 建立一个房间,并每隔 1 秒向局域网广播一个特定的消息;
二、同一个局域网的 Device 若是收到了 3 次这个特定的消息,以后便向 Broadcaster 发送加入房间的消息;
三、Broadcaster 收到 Device 请求加入房间的消息后,将 Device 加入房间。git

首先定义发送者类和接收者类,他们都实现了 Runnable,分别能够用来发送和接收:github

Sender.java编程

import java.io.IOException;
import java.net.*;

public class Sender implements Runnable {

    private static final byte[] EMPTY_DATA = new byte[0];

    private final DatagramSocket socket;
    private final SocketAddress broadcastAddress;
    private final long sendingInterval; // unit is ms

    public Sender(DatagramSocket socket,
            SocketAddress broadcastAddress, int sendingInterval) {
        this.socket = socket;
        this.broadcastAddress = broadcastAddress;
        this.sendingInterval = sendingInterval;
    }

    @Override
    public void run() {
        while (true) {
            byte[] data = getNextData();
            if (data == null || data.length == 0) {
                break;
            }

            DatagramPacket outPacket = new DatagramPacket(
                    data, data.length, broadcastAddress);
            try {
                socket.send(outPacket);
                System.out.println("Sender: Data has been sent");

                Thread.sleep(sendingInterval);
            } catch (IOException | InterruptedException ex) {
                System.err.println("Sender: Error occurred while sending packet");
                break;
            }

        }
        System.out.println("Sender: Thread is end");
    }

    /**
     * 得到下一次发送的数据<br>
     * 子类须要重写这个方法,返回下一次要发送的数据
     *
     * @return 下一次发送的数据
     */
    public byte[] getNextData() {
        return EMPTY_DATA;
    }
}

Receiver.javasegmentfault

import java.io.IOException;
import java.net.*;

public class Receiver implements Runnable {

    private final int BUF_SIZE = 512;

    private final DatagramSocket socket;

    public Receiver(DatagramSocket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        byte[] inData = new byte[BUF_SIZE];
        DatagramPacket inPacket = new DatagramPacket(inData, inData.length);

        while (true) {
            try {
                socket.receive(inPacket);
                if (!handlePacket(inPacket)) {
                    break;
                }
            } catch (IOException ex) {
                System.out.println("Receiver: Socket was closed.");
                break;
            }
        }
        System.out.println("Receiver: Thread is end");
    }

    /**
     * 处理接收到的数据报<br>
     * 子类须要重写这个方法,处理接收到的数据包,并返回是否继续接收
     *
     * @param packet 接收到的数据报
     * @return 是否须要继续接收
     */
    public boolean handlePacket(DatagramPacket packet) {
        return false;
    }
}

而后咱们定义 Device 和 Broadcaster:网络

Device.javasocket

import java.io.IOException;
import java.net.*;

public class Device {

    private static final int DEFAULT_LISTENING_PORT = 10000;

    private final InetAddress address;
    private final int port;

    private DatagramSocket socket;

    public Device(int port) throws IOException {
        this.port = port;
        this.address = InetAddress.getLocalHost();
    }

    public Device(InetAddress address, int port) {
        this.address = address;
        this.port = port;
    }

    public void start() throws SocketException, InterruptedException {
        System.out.println("Device has been started...");
        InetAddress lanAddr = LANAddressTool.getLANAddressOnWindows();
        if (lanAddr != null) {
            System.out.println("Device: LAN Address: " + lanAddr.getHostAddress());
        }

        socket = new DatagramSocket(port);
        Receiver receiver = new Receiver(socket) {
            int recvCount = 0;

            @Override
            public boolean handlePacket(DatagramPacket packet) {
                String recvMsg = new String(packet.getData(), 0, packet.getLength());
                if ("ROOM".equals(recvMsg)) {
                    System.out.printf("Device: Received msg '%s'\n", recvMsg);
                    recvCount++;
                    if (recvCount == 3) {
                        byte[] data = "JOIN".getBytes();
                        DatagramPacket respMsg = new DatagramPacket(
                                data, data.length, packet.getSocketAddress()); // 此时 packet 包含了发送者地址和监听端口
                        try {
                            socket.send(respMsg);
                            System.out.println("Device: Sent response 'JOIN'");
                        } catch (IOException ex) {
                            ex.printStackTrace(System.err);
                        }
                        return false; // 中止接收
                    }
                }
                return true;
            }
        };

        Thread deviceThread = new Thread(receiver);
        deviceThread.start(); // 启动接收数据包的线程
        deviceThread.join();

        close();

        System.out.println("Device has been closed.");
    }

    public void close() {
        if (socket != null) {
            socket.close();
        }
    }

    @Override
    public String toString() {
        return "Device {" + "address=" + address + ", port=" + port + '}';
    }

    public static void main(String[] args) throws Exception {
        Device device = new Device(DEFAULT_LISTENING_PORT);
        device.start();
    }
}

Broadcaster.javaide

import java.net.*;

public class Broadcaster {

    private static final int DEFAULT_BROADCAST_PORT = 10000;

    private final InetAddress bcAddr;
    private final int bcPort;

    private DatagramSocket socket;

    public Broadcaster(InetAddress broadcastAddress, int broadcastPort) {
        this.bcAddr = broadcastAddress;
        this.bcPort = broadcastPort;
    }

    public void start() throws SocketException, InterruptedException {
        System.out.println("Broadcaster has been started...");

        final Room room = new Room("Test");
        System.out.printf("Broadcaster: Created room '%s'\n\n", room.getName());

        socket = new DatagramSocket();
        SocketAddress bcSocketAddr = new InetSocketAddress(bcAddr, bcPort);

        Sender sender = new Sender(socket, bcSocketAddr, 1000) {// 每隔 1000ms 广播一次
            final byte[] DATA = "ROOM".getBytes();

            @Override
            public byte[] getNextData() {
                return DATA;
            }
        };

        Receiver recver = new Receiver(socket) {

            @Override
            public boolean handlePacket(DatagramPacket packet) {
                String recvMsg = new String(packet.getData(), 0, packet.getLength());
                if ("JOIN".equals(recvMsg)) {
                    Device device = new Device(packet.getAddress(), packet.getPort());
                    room.addDevice(device);
                    room.listDevices();
                }
                return true; // 一直接收
            }
        };

        Thread senderThread = new Thread(sender);
        Thread recverThread = new Thread(recver);
        senderThread.start(); // 启动发送(广播)数据包的线程
        recverThread.start(); // 启动接收数据包的线程

        senderThread.join();
        recverThread.join();

        close();
    }

    public void close() {
        if (socket != null) {
            socket.close();
        }
    }

    public static void main(String[] args) throws Exception {
        InetAddress bcAddr = LANAddressTool.getLANBroadcastAddressOnWindows();

        if (bcAddr != null) {
            System.out.println("Broadcast Address: " + bcAddr.getHostAddress());
            Broadcaster broadcaster = new Broadcaster(bcAddr, DEFAULT_BROADCAST_PORT);
            broadcaster.start();
        } else {
            System.out.println("Please check your LAN~");
        }
    }
}

Room.java工具

import java.util.*;

public class Room {

    private final String name;
    private final List<Device> devices;

    public Room(String name) {
        this.name = name;
        this.devices = new ArrayList<>();
    }

    public boolean addDevice(Device device) {
        return devices.add(device);
    }

    public String getName() {
        return name;
    }

    public void listDevices() {
        System.out.printf("Room (%s), current devices:\n", name);
        for (Device device : devices) {
            System.out.println(device);
        }
    }
}

(完整的 Demo 能够访问:https://github.com/mizhoux/LA...this

咱们将这个 Demo 打包成 jar,而后开始运行:
一、首先咱们在本机上启动 Broadcaster:
启动 Broadcaster

二、咱们将本机做为一个 Device 启动:
将本机做为一个 Device 启动

能够看到此时 Broadcaster 建立的房间已经有了一个 Device:
建立的房间已经有了一个 Device

三、咱们启动局域网内的另一台设备:
启动局域网内的另一台设备

此时 Broadcaster 建立的房间便有两个 Device:
建立的房间已经有了两个 Device

四、再启动局域网内的一台设备:
再启动局域网内的一台设备

此时房间里则有三个 Device:
房间里已经有了三个 Device

由于 UDP 在不须要创建链接的基础上就能够发送消息,因此它能够方便的用来探测局域网内特定类型的机器 —— 这是个颇有用的功能 —— 又好比一个集群当中可能会忽然有机器宕机,为了检测这一事件的发生,就须要集群 master机器 每隔必定的时间向每台机器发送若干心跳检测包,若是有回复说明机器正常,不然说明该机器出现了故障,此时不须要链接并且高效的 UDP 就十分适合这种场合。固然,咱们始终仍是要考虑到 UDP 是不可靠的协议,它并不能代替 TCP —— 永远须要根据环境,来选择最合适的技术。

相关文章
相关标签/搜索