(Untitled)

import socket
import threading
import time
from datetime import datetime

class TCPDummyServer:
    def __init__(self, host='localhost', port=8888):
        self.host = host
        self.port = port
        self.server_socket = None
        self.running = False
        
    def start_server(self):
        """TCP 서버 시작"""
        try:
            self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.server_socket.bind((self.host, self.port))
            self.server_socket.listen(5)
            self.running = True
            
            print(f"[{self.get_timestamp()}] TCP 더미 서버 시작: {self.host}:{self.port}")
            print(f"[{self.get_timestamp()}] 클라이언트 연결 대기 중...")
            print("-" * 60)
            
            while self.running:
                try:
                    client_socket, client_address = self.server_socket.accept()
                    print(f"[{self.get_timestamp()}] 새 클라이언트 연결: {client_address}")
                    
                    # 각 클라이언트를 별도 스레드에서 처리
                    client_thread = threading.Thread(
                        target=self.handle_client,
                        args=(client_socket, client_address)
                    )
                    client_thread.daemon = True
                    client_thread.start()
                    
                except socket.error as e:
                    if self.running:
                        print(f"[{self.get_timestamp()}] 소켓 에러: {e}")
                        
        except Exception as e:
            print(f"[{self.get_timestamp()}] 서버 시작 에러: {e}")
        finally:
            self.cleanup()
    
    def handle_client(self, client_socket, client_address):
        """클라이언트 연결 처리"""
        try:
            while self.running:
                # 데이터 수신 (최대 4096 바이트)
                data = client_socket.recv(4096)
                
                if not data:
                    print(f"[{self.get_timestamp()}] 클라이언트 {client_address} 연결 종료")
                    break
                
                # 수신된 데이터 분석 및 출력
                self.analyze_packet(data, client_address)
                
                # 더미 응답 전송 (선택사항)
                response = self.create_dummy_response(data)
                if response:
                    client_socket.send(response)
                    print(f"[{self.get_timestamp()}] 응답 전송 완료: {len(response)} bytes")
                    
        except socket.error as e:
            print(f"[{self.get_timestamp()}] 클라이언트 {client_address} 에러: {e}")
        finally:
            client_socket.close()
            print(f"[{self.get_timestamp()}] 클라이언트 {client_address} 소켓 닫힘")
    
    def analyze_packet(self, data, client_address):
        """수신된 패킷 분석"""
        timestamp = self.get_timestamp()
        data_length = len(data)
        
        print(f"\n{'='*60}")
        print(f"[{timestamp}] 패킷 수신 from {client_address}")
        print(f"데이터 길이: {data_length} bytes")
        print(f"{'='*60}")
        
        # 원시 바이트 데이터 출력 (16진수)
        print("📦 Raw Bytes (Hex):")
        hex_data = ' '.join([f'{b:02x}' for b in data])
        print(f"   {hex_data}")
        
        # ASCII로 해석 가능한 부분 출력
        print("\n📝 ASCII 해석:")
        try:
            ascii_data = data.decode('ascii', errors='replace')
            print(f"   {repr(ascii_data)}")
        except:
            print("   (ASCII 변환 불가)")
        
        # UTF-8로 해석 시도
        print("\n🔤 UTF-8 해석:")
        try:
            utf8_data = data.decode('utf-8', errors='replace')
            print(f"   {repr(utf8_data)}")
        except:
            print("   (UTF-8 변환 불가)")
        
        # HTTP 요청인지 확인
        if data.startswith(b'GET ') or data.startswith(b'POST ') or data.startswith(b'PUT '):
            print("\n🌐 HTTP 요청 감지:")
            lines = data.decode('utf-8', errors='replace').split('\r\n')
            for i, line in enumerate(lines[:10]):  # 처음 10줄만 출력
                print(f"   {i+1}: {line}")
        
        # JSON 데이터인지 확인
        if b'{' in data and b'}' in data:
            print("\n📋 JSON 데이터 감지 가능")
            try:
                import json
                json_str = data.decode('utf-8', errors='replace')
                if '{' in json_str:
                    start = json_str.find('{')
                    end = json_str.rfind('}') + 1
                    json_data = json.loads(json_str[start:end])
                    print(f"   파싱된 JSON: {json.dumps(json_data, indent=2, ensure_ascii=False)}")
            except:
                print("   (JSON 파싱 실패)")
        
        print(f"{'='*60}\n")
    
    def create_dummy_response(self, received_data):
        """더미 응답 생성"""
        # HTTP 요청에 대한 응답
        if received_data.startswith(b'GET ') or received_data.startswith(b'POST '):
            response = (
                b"HTTP/1.1 200 OK\r\n"
                b"Content-Type: application/json\r\n"
                b"Content-Length: 51\r\n"
                b"Connection: close\r\n"
                b"\r\n"
                b'{"status": "success", "message": "dummy response"}'
            )
            return response
        
        # 일반 TCP 데이터에 대한 에코 응답
        elif len(received_data) > 0:
            echo_response = b"ECHO: " + received_data
            return echo_response
        
        return None
    
    def get_timestamp(self):
        """현재 시간 문자열 반환"""
        return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
    
    def stop_server(self):
        """서버 중지"""
        print(f"\n[{self.get_timestamp()}] 서버 중지 중...")
        self.running = False
        if self.server_socket:
            self.server_socket.close()
    
    def cleanup(self):
        """리소스 정리"""
        if self.server_socket:
            self.server_socket.close()
        print(f"[{self.get_timestamp()}] 서버 리소스 정리 완료")


def main():
    """메인 함수"""
    print("🚀 TCP 더미 서버")
    print("=" * 60)
    
    # 서버 설정
    host = input("호스트 (기본값: localhost): ").strip() or "localhost"
    port_input = input("포트 (기본값: 8888): ").strip()
    port = int(port_input) if port_input.isdigit() else 8888
    
    server = TCPDummyServer(host, port)
    
    try:
        # 서버 시작 (별도 스레드)
        server_thread = threading.Thread(target=server.start_server)
        server_thread.daemon = True
        server_thread.start()
        
        print("\n명령어:")
        print("  - 'quit' 또는 'q': 서버 종료")
        print("  - 'status': 서버 상태 확인")
        print("  - Enter: 계속 실행")
        print("-" * 60)
        
        # 사용자 입력 대기
        while True:
            user_input = input().strip().lower()
            if user_input in ['quit', 'q']:
                break
            elif user_input == 'status':
                status = "실행 중" if server.running else "중지됨"
                print(f"서버 상태: {status} ({host}:{port})")
            
    except KeyboardInterrupt:
        print("\n\n⚠️ Ctrl+C 감지됨")
    finally:
        server.stop_server()
        print("👋 서버 종료 완료")


if __name__ == "__main__":
    main()

Read more

CSTI

좋은 질문이에요 👍 CSTI(Client-Side Template Injection)와 SSTI(Server-Side Template Injection)는 모두 템플릿 엔진을 통한 코드 실행 취약점이라는 공통점이 있지만, 동작 환경(클라이언트/서버)에 따라 대응 방식이 달라집니다. 보안 전문가 관점에서 기술적 대응 + 운영적 대응을 함께 정리해드릴게요. ✅ CSTI 및 SSTI 보안 취약점 대응방안 1. 공통 대응방안 템플릿 엔진의

By ByteGaze