TCP 통신 개발 기록
TCP 소캣을 아직도 쓴다고? (웹소캣 얘기하는 거 아님)
- 아직도 쓴다
간단한 조정만 필요한 기기에 직접 통신할 경우에는 tcp 통신으로 제어한다.
어떻게 하지?
- 기기회사에서 친절하게 사용자매뉴얼 준다
ex.경광등매뉴얼
여기 다운로드에서 매뉴얼 - 3.Data format of Socket Program_V01(ENG) << 이게 핵심이다.
예시를 C++ 같은 거로만 주기 때문에 파이썬이나 자바 사용자는 당황스러울 수 있겠으나,
통신 자체가 복잡하지 않아 금방 할 수 있다.
python test (전체 내용은 첨부파일)
class LampCommandRequest:
"""램프 명령 요청 클래스"""
def __init__(self, lamp_ip: str, lamp_port: int,
lamp_color: LampColor, lamp_sound: LampSound,
lamp_light_type: LampLightType):
self.lamp_ip = lamp_ip
self.lamp_port = lamp_port
self.lamp_color = lamp_color
self.lamp_sound = lamp_sound
self.lamp_light_type = lamp_light_type
self.command = self._build_command()
def _resolve_light_type_byte(self) -> int:
"""라이트 타입을 바이트로 변환"""
if self.lamp_light_type == LampLightType.ON:
return 0x01
elif self.lamp_light_type == LampLightType.BLINK:
return 0x02
else: # OFF
return 0x00
def _resolve_color_bytes(self, light_type: int) -> bytes:
"""색상을 5바이트 배열로 변환"""
color = [0x00, 0x00, 0x00, 0x00, 0x00] # [RED, YELLOW, GREEN, BLUE, WHITE]
if self.lamp_color == LampColor.OFF:
return bytes(color)
elif self.lamp_color == LampColor.KEEP:
return bytes([0x03, 0x03, 0x03, 0x03, 0x03])
elif self.lamp_color == LampColor.RED:
color[0] = light_type
elif self.lamp_color == LampColor.YELLOW:
color[1] = light_type
elif self.lamp_color == LampColor.GREEN:
color[2] = light_type
elif self.lamp_color == LampColor.BLUE:
color[3] = light_type
elif self.lamp_color == LampColor.WHITE:
color[4] = light_type
return bytes(color)
def _resolve_sound_byte(self) -> int:
"""사운드를 바이트로 변환"""
sound_map = {
LampSound.OFF: 0x00,
LampSound.FIRE: 0x01,
LampSound.EMERGENCY: 0x02,
LampSound.AMBULANCE: 0x03,
LampSound.PI_PI_PI: 0x04,
LampSound.PI_CONTINUE: 0x05
}
return sound_map.get(self.lamp_sound, 0x00)
def _build_command(self) -> bytes:
"""10바이트 명령 패킷 생성"""
write = 0x57
type_byte = 0x00
light_type = self._resolve_light_type_byte()
color_bytes = self._resolve_color_bytes(light_type)
sound = self._resolve_sound_byte()
spare0 = 0x00
spare1 = 0x00
# 패킷 구조: [WRITE(1)] [TYPE(1)] [COLOR(5)] [SOUND(1)] [SPARE(2)]
command = struct.pack('BB', write, type_byte)
command += color_bytes
command += struct.pack('BBB', sound, spare0, spare1)
return command
def get_command_hex(self) -> str:
"""명령을 16진수 문자열로 반환"""
return ' '.join(f'{b:02X}' for b in self.command)
유의할 점은 너무 짧은 간격으로 여러 요청 시 디바이스에서 응답을 주지 않는 경우 발생.
간격을 0.1 ~5초 정도 줘야 씹히는 경우가 덜 발생한다. (디바이스 소프트웨어 별로 차이 있을듯...)
아래는 자바로 개발했던 경우를 요약 해놓은 자료이다.
세마포어 기반 TCP 명령 큐
[!summary]
소켓 하나, 세마포어 풀 하나, 그리고 혹독한 페이싱 전략으로 고집 센 램프 하드웨어를 말 잘 듣게 만든 기록입니다.
1. 큐가 필요했던 이유
- 램프는 초당 한 건의 명령만 수용하며, 이를 어길 경우 MCU가 전원 리셋 전까지 죽어버립니다.
- 색상, 사이렌, 센서 ping 등 API 계층에서 동시에 쏟아지는 호출을 안전하게 직렬화해야 했습니다.
- 순정 TCP 소켓에는 흐름 제어가 없으므로, 애플리케이션 차원에서 공정성을 구현했습니다.
2. 전략 요약
- 모든 디바이스를
ip:port키로 해시하고 각 키에 세마포어를 하나씩 부여합니다. - 5초 동안 permit 획득을 시도하고, 이미 사용 중이면 경고를 남기고 빠르게 종료합니다.
- 하드웨어 쿨다운 간격(
100 ms, 환경별 재정의 가능)을 반드시 지킵니다. - 페이로드를 흘려보내고 ACK/상태 바이트를 대기한 뒤, 어떤 경우에도 permit 을 반환합니다.
[!tip]
요청이 몰릴 때 세마포어 대기열 자체가 헬스 신호가 됩니다. Micrometer 지표로 노출하면 병목이 코드인지 배선인지 쉽게 구분할 수 있습니다.
3. 구현 스케치
public final class BeaconQueueWorker {
private static final long DEVICE_COOLDOWN_MS = 100L;
private static final ConcurrentMap<String, Semaphore> PERMITS = new ConcurrentHashMap<>();
private Semaphore permitFor(DeviceAddress device) {
return PERMITS.computeIfAbsent(device.key(), key -> new Semaphore(1, true));
}
@Async("eventDispatcher")
public CompletableFuture<Void> dispatch(DeviceCommand cmd) {
DeviceAddress device = cmd.target();
Semaphore lock = permitFor(device);
if (!lock.tryAcquire(5, TimeUnit.SECONDS)) {
log.warn("device queue saturated: {}", device);
return CompletableFuture.completedFuture(null);
}
try {
Thread.sleep(DEVICE_COOLDOWN_MS);
talkToSocket(device, cmd.payload());
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
} finally {
lock.release();
}
return CompletableFuture.completedFuture(null);
}
private void talkToSocket(DeviceAddress device, byte[] payload) throws IOException {
try (Socket socket = new Socket()) {
socket.connect(device.toSocketAddress(), 5000);
socket.setSoTimeout(5000);
socket.getOutputStream().write(payload);
socket.getOutputStream().flush();
byte[] echo = socket.getInputStream().readNBytes(12);
log.debug("{} -> {}", device, Arrays.toString(echo));
}
}
}
주요 차별점:
Semaphore(1, true)로 공정한 큐를 보장하여 LIFO 깨짐 문제를 없앴습니다.- 파싱 전에 에코 바이트를 로그로 남겨, 펌웨어 변덕을 현장에서 빠르게 잡아낼 수 있었습니다.
4. 상태 프레임 파싱
public record StatusTelemetry(boolean ok, int red, int amber, int green, int blue, int siren) {
static StatusTelemetry from(byte[] frame) {
if (frame.length < 10) {
return new StatusTelemetry(false, 0, 0, 0, 0, 0);
}
return new StatusTelemetry(
frame[0] == 0x52,
Byte.toUnsignedInt(frame[2]),
Byte.toUnsignedInt(frame[3]),
Byte.toUnsignedInt(frame[4]),
Byte.toUnsignedInt(frame[5]),
Byte.toUnsignedInt(frame[7])
);
}
}
- 장비는 고정 10바이트 패킷(헤더, 사운드 그룹, 5개 램프 채널, 사운드 슬롯, 스페어 2바이트)을 돌려줍니다.
아래는 영어 버전
Semaphore-Governed TCP Command Queue
[!summary]
Lessons learned while keeping a noisy legacy lamp rig stable with nothing more than a TCP socket, a semaphore pool, and ruthless pacing.
1. Why a Queue Was Needed
- Each lamp accepts only one command per second; violating that limit bricks the MCU until power-cycled.
- Operators often trigger simultaneous actions (color, siren, sensor ping) that naturally fan out from the API tier.
- Raw sockets do not offer built-in flow control, so we had to build fairness ourselves.
2. Strategy In a Nutshell
- Hash every device by
ip:portand give it a dedicated permit. - Acquire the permit with a five-second patience window; bail out or log if the line is already busy.
- Sleep for the hardware cool-down interval (
100 msin production, overridable per environment). - Stream the payload, block for the ACK/status bytes, and release the permit no matter what.
[!tip]
When requests spike, the semaphore itself becomes a health signal. Expose queued callers via Micrometer so on-call engineers know whether the bottleneck is code or copper.
3. Implementation Sketch
public final class BeaconQueueWorker {
private static final long DEVICE_COOLDOWN_MS = 100L;
private static final ConcurrentMap<String, Semaphore> PERMITS = new ConcurrentHashMap<>();
private Semaphore permitFor(DeviceAddress device) {
return PERMITS.computeIfAbsent(device.key(), key -> new Semaphore(1, true));
}
@Async("eventDispatcher")
public CompletableFuture<Void> dispatch(DeviceCommand cmd) {
DeviceAddress device = cmd.target();
Semaphore lock = permitFor(device);
if (!lock.tryAcquire(5, TimeUnit.SECONDS)) {
log.warn("device queue saturated: {}", device);
return CompletableFuture.completedFuture(null);
}
try {
Thread.sleep(DEVICE_COOLDOWN_MS);
talkToSocket(device, cmd.payload());
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
} finally {
lock.release();
}
return CompletableFuture.completedFuture(null);
}
private void talkToSocket(DeviceAddress device, byte[] payload) throws IOException {
try (Socket socket = new Socket()) {
socket.connect(device.toSocketAddress(), 5000);
socket.setSoTimeout(5000);
socket.getOutputStream().write(payload);
socket.getOutputStream().flush();
byte[] echo = socket.getInputStream().readNBytes(12);
log.debug("{} -> {}", device, Arrays.toString(echo));
}
}
}
Key differences from the legacy attempt:
Semaphore(1, true)enforces fairness instead of waking callers in LIFO order.- We log the echo bytes before parsing, which has already saved us from firmware quirks during field rollouts.
4. Parsing the Status Frame
public record StatusTelemetry(boolean ok, int red, int amber, int green, int blue, int siren) {
static StatusTelemetry from(byte[] frame) {
if (frame.length < 10) {
return new StatusTelemetry(false, 0, 0, 0, 0, 0);
}
return new StatusTelemetry(
frame[0] == 0x52,
Byte.toUnsignedInt(frame[2]),
Byte.toUnsignedInt(frame[3]),
Byte.toUnsignedInt(frame[4]),
Byte.toUnsignedInt(frame[5]),
Byte.toUnsignedInt(frame[7])
);
}
}