JUST WRITE

1개로 부족해, 2개의 IP로 접근하기 본문

MLOps/Kafka

1개로 부족해, 2개의 IP로 접근하기

천재보단범재 2023. 9. 5. 21:10

Kafka 설정 - Listener

1개로 부족해, 2개의 IP로 접근하기

요즘 Cloud 환경에서 Server를 세팅하는 경우가 많습니다.

Kafka 역시 Cloud 환경에서 운영하는 경우가 많습니다.

Cloud 환경에서는 보통 Public Network, Private Network로 구성됩니다.

AWS EC2 Network

보안적으로 Public, Private 나눠서 구성하는 것이 좋습니다.

Private Network는 내부에서만 접근하기 때문에 비교적 안전합니다.

그래도 외부에서 접근해야 될 경우도 있습니다.

Public, Private IP로 모두 접근이 필요한 경우가 많습니다.

Kafka도 Public, Private IP로 모두 접근하려면 어떻게 해야 할까요?

Kafka Public/Private IP로 접근

Kafka 설정 중 Listener 관련 설정을 세팅해 주면 됩니다.

Kafka Configuration 중 server.properties 파일을 수정해 줍니다.

아래와 같이 수정해 주면 됩니다.

listeners=INTERNAL_PLAINTEXT://0.0.0.0:9092,EXTERNAL_PLAINTEXT://0.0.0.0:29092

advertised.listeners=INTERNAL_PLAINTEXT://${Private_IP}:9092,EXTERNAL_PLAINTEXT://${Public_IP}:29092

listener.security.protocol.map=INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

inter.broker.listener.name=INTERNAL_PLAINTEXT

위에 수정한 설정값들을 하나하나 살펴보도록 하겠습니다.

listeners

  • default : PLAINTEXT://:9092

listenersKafka가 설치된 Server Network Interface에 바인딩되는 설정입니다.

기본값은 PLAITEXT://:9092인데 Kafka Service가 모든 IP의 9092 Port로 노출됩니다.

위 예시 0.0.0.0는 모든 IP를 의미합니다.

위 예시처럼 Kafka를 설정하고 netstat 명령어로 Network 상태를 확인하면 아래와 같습니다.

9092와 29092 Port로 모든 IP로 노출되고 있는 것을 확인할 수 있습니다.

PID가 5820이 Kafka 인 것을 확인할 수 있습니다.

$ netstat -nltp
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name
tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      -
tcp6       0      0 :::35389                :::*                    LISTEN      5820/java
tcp6       0      0 :::43703                :::*                    LISTEN      5428/java
tcp6       0      0 :::9092                 :::*                    LISTEN      5820/java
tcp6       0      0 :::22                   :::*                    LISTEN      -
tcp6       0      0 :::2181                 :::*                    LISTEN      5428/java
tcp6       0      0 :::29092                :::*                    LISTEN      5820/java

$ jps
5428 QuorumPeerMain
7285 Jps
5820 Kafka

특정 IP로 넣어서 설정하면 해당 IP로 바인딩됩니다.

아래는 AWS EC2 환경에서 진행하였습니다.

위 예제에서 listeners를 0.0.0.0이 아닌 Public IP와 Private IP로 설정하였습니다.

Private IP로는 노출이 확인되었지만 Public IP로는 안 됩니다.

Network Interface를 확인해 보면 Public IP는 확인할 수 없습니다.

해당 부분은 Cloud Network 관련 부분이므로 따로 정리해서 포스팅하겠습니다.

$ netstat -ntlp
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name
tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      -
tcp6       0      0 :::40627                :::*                    LISTEN      7961/java
tcp6       0      0 172.31.13.36:9092       :::*                    LISTEN      7961/java
tcp6       0      0 :::43703                :::*                    LISTEN      5428/java
tcp6       0      0 :::22                   :::*                    LISTEN      -
tcp6       0      0 :::2181                 :::*                    LISTEN      5428/java

$ ip a
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
    link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
    inet 127.0.0.1/8 scope host lo
       valid_lft forever preferred_lft forever
    inet6 ::1/128 scope host
       valid_lft forever preferred_lft forever
2: ens5: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 9001 qdisc mq state UP group default qlen 1000
    link/ether 02:ce:42:cb:be:54 brd ff:ff:ff:ff:ff:ff
    altname enp0s5
    altname eni-05125d7c926ed1996
    altname device-number-0
    inet 172.31.13.36/20 metric 512 brd 172.31.15.255 scope global dynamic ens5
       valid_lft 2981sec preferred_lft 2981sec
    inet6 fe80::ce:42ff:fecb:be54/64 scope link
       valid_lft forever preferred_lft forever
       
       
$ tail -200l logs/server.log
 org.apache.kafka.common.KafkaException: Socket server failed to bind to 3.35.132.242:29092: Cannot assign requested address.
        at kafka.network.Acceptor.openServerSocket(SocketServer.scala:734)
        at kafka.network.Acceptor.liftedTree1$1(SocketServer.scala:637)
        at kafka.network.Acceptor.start(SocketServer.scala:632)
        at kafka.network.SocketServer.$anonfun$enableRequestProcessing$2(SocketServer.scala:222)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883)
        at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251)
        at kafka.network.SocketServer.chainAcceptorFuture$1(SocketServer.scala:215)
        at kafka.network.SocketServer.$anonfun$enableRequestProcessing$5(SocketServer.scala:229)
        at java.base/java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4772)
        at kafka.network.SocketServer.enableRequestProcessing(SocketServer.scala:229)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:573)
        at kafka.Kafka$.main(Kafka.scala:113)
        at kafka.Kafka.main(Kafka.scala)
Caused by: java.net.BindException: Cannot assign requested address
        at java.base/sun.nio.ch.Net.bind0(Native Method)
        at java.base/sun.nio.ch.Net.bind(Net.java:461)
        at java.base/sun.nio.ch.Net.bind(Net.java:453)
        at java.base/sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:227)
        at java.base/sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:80)
        at kafka.network.Acceptor.openServerSocket(SocketServer.scala:730)

advertised.listeners

  • default : null

advertiesd.listenersBroker가 Client에게 알려주는 IP/Port 설정입니다.

해당 옵션을 따로 설정하지 않으면 listeners 옵션을 따릅니다.

listeners처럼 0.0.0.0 IP 사용은 불가능합니다.

Kafka Doc에서는 외부 LoadBalancer를 사용 시 설정하면 유용한 옵션이라고 합니다.

listener.security.protocol.map

  • default : PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

해당 옵션은 listener와 Security Protocol를 매핑해 주는 설정입니다.

${listener명}:${Protocol명} 이런 식으로 comma(,)로 구분해서 입력해 줍니다.

Kafka에서 지원하는 Security Protocol은 아래 4가지입니다.

  • PLAINTEXT
  • SASL_PLAINTEXT
  • SASL_SSL
  • SSL

Kafka Java Doc(https://docs.confluent.io/platform/current/clients/javadocs/javadoc/index.html)

위 예제에서 INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT를 추가하였습니다.

2가지 listener는 인증을 안 하는 PLAINTEXT로 설정하였습니다.

원하는 listener명을 Security Protocol에 추가가 가능합니다.

inter.broker.listener.name

  • default : null

Kafka는 다수의 Broker로 Cluster를 구성합니다.

Broker끼리 Partition Replica, Status Check 등 다양한 통신을 진행합니다.

해당 옵션은 Broker끼리 통신할 때 어떤 listener로 통신할지 설정하는 값입니다.

따로 값을 안 주면 security.inter.broker.protocol 값을 따릅니다.

위 예시에서는 Private IP로 Broker끼리 통신하게 설정해 보았습니다.

[참고사이트]

 

728x90
반응형
Comments