The docker image wurstmeister/kafka is the most stared image for kafka in hub.docker.com, but the useage is rare, so in this post, I would take some time to talk about the usage of this docker image.html
docker run -d --name kafka -p 9092:9092 \ -e KAFKA_ADVERTISED_HOST_NAME=kafka \ -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_ADVERTISED_PORT=9092 \ -e KAFKA_BROKER_ID=1 \ -e KAFKA_LISTENERS=PLAINTEXT://:9092 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://:9092 \ -e KAFKA_CREATE_TOPICS="stream-in:2:1,stream-out:2:1" \ --link zookeeper wurstmeister/kafka:1.1.0
In the command above, we showed the flowing features about the usage of this image:java
a). specify the advertised host name,whichi would regulalr be the container name and routed to the actual host ip of the container defined in the /etc/hosts file.node
b). the zookeeper list which kafka cluster used for cluster coordinate, there should be at least one zookeeper started, or else the kafka should be start failed.
linux
c). the port number via which can accesse the kafka broker runed in the container instacne.docker
d).KAFKA_LISTENERS,KAFKA_ADVERTISED_LISTENERS, these two enviroment variables should be defined, or esle the container would start failed.apache
1. advertised.listeners须要配置,若是不配置会使用listeners属性,若是listeners也不配置, 经过默认的方式获取:java.net.InetAddress.getCanonicalHostName(),该方法预计会返回hostname。 But:host.name 开始只绑定在了内部IP上,对外网卡没法访问.须要避免将Kafka broker机器的hostname注册进zookeeper 2. kafka的advertised.host.name参数 外网访问配置 kafka的server.properties文件 ```host.name```开始只绑定在了内部IP上,对外网卡没法访问。 把值设置为空的话会kafka监听端口在全部的网卡上绑定。可是在外网访问时,客户端又遇到了```java.nio.channels.ClosedChannelException```异常信息, server端用tcpdump分析的时候发现客户端有传递kafka所在机器的机器名过来。在client里断点跟踪一下发现是findLeader的时候返回的元信息是机器名而不是IP。 客户端没法解析这个机器名因此出现了前面的异常。 在server.properties 里还有另外一个参数是解决这个问题的, advertised.host.name参数用来配置返回的host.name值,把这个参数配置为外网IP地址便可。 这个参数默认没有启用,默认是返回的java.net.InetAddress.getCanonicalHostName的值,在个人mac上这个值并不等于hostname的值而是返回IP,但在linux上这个值就是hostname的值。 除了IP以外,还有PORT,外网对应的PORT也须要修改。如下是server.properties文件对应位置。 # Hostname the broker will advertise to producers and consumers. If not set, it uses the # value for "host.name" if configured. Otherwise, it will use the value returned from # java.net.InetAddress.getCanonicalHostName(). #advertised.host.name=<hostname routable by clients> # The port to publish to ZooKeeper for clients to use. If this is not set, # it will publish the same port that the broker binds to. #advertised.port=<port accessible by clients> 当Kafka broker启动时,它会在ZK上注册本身的IP和端口号,客户端就经过这个IP和端口号来链接。 在AWS这种IaaS环境下,因为java.net.InetAddress.getCanonicalHostName调用拿到的HostName是相似ip-172-31-10-199这样的只有内网才能访问到的主机名,因此默认注册到ZK上的IP是内网才能访问的内网IP。 此时就须要显示指定 advertised.host.name, advertised.listeners参数,让注册到ZK上的IP是外网IP。 例如对于 59.64.11.22 IP对应的broker,须要在 server.properties 配置文件里增长以下三个配置: advertised.host.name advertised.listeners advertised.port 新版配置 advertised.listeners=PLAINTEXT://59.64.11.22:9092 估计读者们也会跟我同样犯迷糊,为何须要三个参数来配置IP和端口号呢,用一个advertised.listeners不就搞定了吗? 后来发现最新版本0.10.x broker配置弃用了advertised.host.name 和 advertised.port 这两个个配置项,就配置advertised.listeners就能够了。
for detail please refer to my blog post.bash
In this blog post, there are serveral key points about install kafka cluster:jvm
a) the broker id, which represents the unique id of the broker in the kafka cluster.tcp
b) zookeeper.connect which stores the meta data for cluster, the kafka cluster (even only has one single node kafka node) depends on zookeeper to functionate.ide
c) the host.name property which refer to to actual ip address or hostname( this property has been deprected since kafka 0.11), we can ignore it.
d) set the value of advertised.host.name,advertised.listeners,advertised.port which would publish the service to the client, so the client can access kafka via the address and port number through the value confied by these 2 parameters.
begin we goes deepinside the internal of the kafka image, let's take a look at the entry_point script of the image:
#!/bin/bash -e # Store original IFS config, so we can restore it at various stages ORIG_IFS=$IFS if [[ -z "$KAFKA_ZOOKEEPER_CONNECT" ]]; then echo "ERROR: missing mandatory config: KAFKA_ZOOKEEPER_CONNECT" exit 1 fi if [[ -z "$KAFKA_PORT" ]]; then export KAFKA_PORT=9092 fi create-topics.sh & unset KAFKA_CREATE_TOPICS # DEPRECATED: but maintained for compatibility with older brokers pre 0.9.0 (https://issues.apache.org/jira/browse/KAFKA-1809) if [[ -z "$KAFKA_ADVERTISED_PORT" && \ -z "$KAFKA_LISTENERS" && \ -z "$KAFKA_ADVERTISED_LISTENERS" && \ -S /var/run/docker.sock ]]; then KAFKA_ADVERTISED_PORT=$(docker port "$(hostname)" $KAFKA_PORT | sed -r 's/.*:(.*)/\1/g') export KAFKA_ADVERTISED_PORT fi if [[ -z "$KAFKA_BROKER_ID" ]]; then if [[ -n "$BROKER_ID_COMMAND" ]]; then KAFKA_BROKER_ID=$(eval "$BROKER_ID_COMMAND") export KAFKA_BROKER_ID else # By default auto allocate broker ID export KAFKA_BROKER_ID=-1 fi fi if [[ -z "$KAFKA_LOG_DIRS" ]]; then export KAFKA_LOG_DIRS="/kafka/kafka-logs-$HOSTNAME" fi if [[ -n "$KAFKA_HEAP_OPTS" ]]; then sed -r -i 's/(export KAFKA_HEAP_OPTS)="(.*)"/\1="'"$KAFKA_HEAP_OPTS"'"/g' "$KAFKA_HOME/bin/kafka-server-start.sh" unset KAFKA_HEAP_OPTS fi if [[ -n "$HOSTNAME_COMMAND" ]]; then HOSTNAME_VALUE=$(eval "$HOSTNAME_COMMAND") # Replace any occurences of _{HOSTNAME_COMMAND} with the value IFS=$'\n' for VAR in $(env); do if [[ $VAR =~ ^KAFKA_ && "$VAR" =~ "_{HOSTNAME_COMMAND}" ]]; then eval "export ${VAR//_\{HOSTNAME_COMMAND\}/$HOSTNAME_VALUE}" fi done IFS=$ORIG_IFS fi if [[ -n "$PORT_COMMAND" ]]; then PORT_VALUE=$(eval "$PORT_COMMAND") # Replace any occurences of _{PORT_COMMAND} with the value IFS=$'\n' for VAR in $(env); do if [[ $VAR =~ ^KAFKA_ && "$VAR" =~ "_{PORT_COMMAND}" ]]; then eval "export ${VAR//_\{PORT_COMMAND\}/$PORT_VALUE}" fi done IFS=$ORIG_IFS fi if [[ -n "$RACK_COMMAND" && -z "$KAFKA_BROKER_RACK" ]]; then KAFKA_BROKER_RACK=$(eval "$RACK_COMMAND") export KAFKA_BROKER_RACK fi # Try and configure minimal settings or exit with error if there isn't enough information if [[ -z "$KAFKA_ADVERTISED_HOST_NAME$KAFKA_LISTENERS" ]]; then if [[ -n "$KAFKA_ADVERTISED_LISTENERS" ]]; then echo "ERROR: Missing environment variable KAFKA_LISTENERS. Must be specified when using KAFKA_ADVERTISED_LISTENERS" exit 1 elif [[ -z "$HOSTNAME_VALUE" ]]; then echo "ERROR: No listener or advertised hostname configuration provided in environment." echo " Please define KAFKA_LISTENERS / (deprecated) KAFKA_ADVERTISED_HOST_NAME" exit 1 fi # Maintain existing behaviour # If HOSTNAME_COMMAND is provided, set that to the advertised.host.name value if listeners are not defined. export KAFKA_ADVERTISED_HOST_NAME="$HOSTNAME_VALUE" fi #Issue newline to config file in case there is not one already echo "" >> "$KAFKA_HOME/config/server.properties" ( # Read in env as a new-line separated array. This handles the case of env variables have spaces and/or carriage returns. See #313 IFS=$'\n' for VAR in $(env) do if [[ $VAR =~ ^KAFKA_ && ! $VAR =~ ^KAFKA_HOME ]]; then kafka_name=$(echo "$VAR" | sed -r 's/KAFKA_(.*)=.*/\1/g' | tr '[:upper:]' '[:lower:]' | tr _ .) env_var=$(echo "$VAR" | sed -r 's/(.*)=.*/\1/g') if grep -E -q '(^|^#)'"$kafka_name=" "$KAFKA_HOME/config/server.properties"; then sed -r -i 's@(^|^#)('"$kafka_name"')=(.*)@\2='"${!env_var}"'@g' "$KAFKA_HOME/config/server.properties" #note that no config values may contain an '@' char else echo "$kafka_name=${!env_var}" >> "$KAFKA_HOME/config/server.properties" fi fi if [[ $VAR =~ ^LOG4J_ ]]; then log4j_name=$(echo "$VAR" | sed -r 's/(LOG4J_.*)=.*/\1/g' | tr '[:upper:]' '[:lower:]' | tr _ .) log4j_env=$(echo "$VAR" | sed -r 's/(.*)=.*/\1/g') if grep -E -q '(^|^#)'"$log4j_name=" "$KAFKA_HOME/config/log4j.properties"; then sed -r -i 's@(^|^#)('"$log4j_name"')=(.*)@\2='"${!log4j_env}"'@g' "$KAFKA_HOME/config/log4j.properties" #note that no config values may contain an '@' char else echo "$log4j_name=${!log4j_env}" >> "$KAFKA_HOME/config/log4j.properties" fi fi done ) if [[ -n "$CUSTOM_INIT_SCRIPT" ]] ; then eval "$CUSTOM_INIT_SCRIPT" fi exec "$KAFKA_HOME/bin/kafka-server-start.sh" "$KAFKA_HOME/config/server.properties"
if [[ -z "$KAFKA_ZOOKEEPER_CONNECT" ]]; then echo "ERROR: missing mandatory config: KAFKA_ZOOKEEPER_CONNECT" exit 1 fi
if [[ -z "$KAFKA_PORT" ]]; then export KAFKA_PORT=9092 fi
if env KAFKA_PORT is not provided, then would use the default port number 9092 as the kafka port number.
if [[ -z "$KAFKA_BROKER_ID" ]]; then if [[ -n "$BROKER_ID_COMMAND" ]]; then KAFKA_BROKER_ID=$(eval "$BROKER_ID_COMMAND") export KAFKA_BROKER_ID else # By default auto allocate broker ID export KAFKA_BROKER_ID=-1 fi fi
if not provided, then use -1 which mean auto generated broker id.
if [[ -n "$KAFKA_HEAP_OPTS" ]]; then sed -r -i 's/(export KAFKA_HEAP_OPTS)="(.*)"/\1="'"$KAFKA_HEAP_OPTS"'"/g' "$KAFKA_HOME/bin/kafka-server-start.sh" unset KAFKA_HEAP_OPTS fi
3.5 The configuration of KAFKA_ADVERTISED_HOST_NAME,KAFKA_LISTENERS,KAFKA_ADVERTISED_LISTENERS,HOSTNAME_VALUE
# Try and configure minimal settings or exit with error if there isn't enough information if [[ -z "$KAFKA_ADVERTISED_HOST_NAME$KAFKA_LISTENERS" ]]; then if [[ -n "$KAFKA_ADVERTISED_LISTENERS" ]]; then echo "ERROR: Missing environment variable KAFKA_LISTENERS. Must be specified when using KAFKA_ADVERTISED_LISTENERS" exit 1 elif [[ -z "$HOSTNAME_VALUE" ]]; then echo "ERROR: No listener or advertised hostname configuration provided in environment." echo " Please define KAFKA_LISTENERS / (deprecated) KAFKA_ADVERTISED_HOST_NAME" exit 1 fi # Maintain existing behaviour # If HOSTNAME_COMMAND is provided, set that to the advertised.host.name value if listeners are not defined. export KAFKA_ADVERTISED_HOST_NAME="$HOSTNAME_VALUE" fi
The script above indicate the following tips:
a) we should config KAFKA_ADVERTISED_HOST_NAME, this config is the reqired config parameter.