DATA/BYBIT/daemon/web_socket/bybit_total_striker.php
<?php
// ============================================================
// 타격기 : bybit_total_striker.php
// 웹소켓 연결 → 메모리 조립 → 0.9초마다 DB 저장
// 환경기에서 받는 값 : $pdo, $clean_symbols, $WS_URL
// ============================================================

// ── 1. 웹소켓 핸드셰이크 연결 ────────────────────────────────
if (!function_exists('ws_connect')) {
function ws_connect(string $url): mixed {
    $parsed = parse_url($url);
    $host   = $parsed['host'];
    $path   = ($parsed['path'] ?? '/') . (isset($parsed['query']) ? '?' . $parsed['query'] : '');
    $port   = $parsed['port'] ?? 443;

    $ctx = stream_context_create(['ssl' => ['verify_peer' => false, 'verify_peer_name' => false]]);
    $sock = stream_socket_client("ssl://{$host}:{$port}", $errno, $errstr, 10, STREAM_CLIENT_CONNECT, $ctx);
    if (!$sock) throw new RuntimeException("소켓 연결 실패: {$errstr} ({$errno})");

    stream_set_timeout($sock, 0, 100000); // 0.1초 타임아웃 (논블로킹용)

    // HTTP 업그레이드 요청
    $key = base64_encode(random_bytes(16));
    $handshake  = "GET {$path} HTTP/1.1\r\n";
    $handshake .= "Host: {$host}\r\n";
    $handshake .= "Upgrade: websocket\r\n";
    $handshake .= "Connection: Upgrade\r\n";
    $handshake .= "Sec-WebSocket-Key: {$key}\r\n";
    $handshake .= "Sec-WebSocket-Version: 13\r\n\r\n";
    fwrite($sock, $handshake);

    // 응답 확인
    $response = '';
    while (!feof($sock)) {
        $response .= fread($sock, 4096);
        if (strpos($response, "\r\n\r\n") !== false) break;
    }
    if (strpos($response, '101') === false) {
        throw new RuntimeException("핸드셰이크 실패: " . substr($response, 0, 200));
    }

    return $sock;
}
}

// ── 2. 웹소켓 프레임 전송 ────────────────────────────────────
if (!function_exists('ws_send')) {
function ws_send(mixed $sock, string $data): void {
    $len    = strlen($data);
    $mask   = random_bytes(4);
    $masked = '';
    for ($i = 0; $i < $len; $i++) {
        $masked .= $data[$i] ^ $mask[$i % 4];
    }
    $header = "\x81"; // FIN + text frame
    if ($len < 126) {
        $header .= chr(0x80 | $len);
    } elseif ($len < 65536) {
        $header .= "\xFE" . pack('n', $len);
    } else {
        $header .= "\xFF" . pack('J', $len);
    }
    fwrite($sock, $header . $mask . $masked);
}
}

// ── 3. 웹소켓 프레임 수신 ────────────────────────────────────
if (!function_exists('ws_read')) {
function ws_read(mixed $sock): ?string {
    $head = fread($sock, 2);
    if ($head === false || strlen($head) < 2) return null;

    $byte2      = ord($head[1]);
    $masked     = ($byte2 & 0x80) !== 0;
    $payloadLen = $byte2 & 0x7F;

    if ($payloadLen === 126) {
        $ext = fread($sock, 2);
        $payloadLen = unpack('n', $ext)[1];
    } elseif ($payloadLen === 127) {
        $ext = fread($sock, 8);
        $payloadLen = unpack('J', $ext)[1];
    }

    $maskKey = $masked ? fread($sock, 4) : '';
    $payload = '';
    $remaining = $payloadLen;
    while ($remaining > 0) {
        $chunk = fread($sock, min($remaining, 8192));
        if ($chunk === false || $chunk === '') break;
        $payload   .= $chunk;
        $remaining -= strlen($chunk);
    }

    if ($masked) {
        $unmasked = '';
        for ($i = 0; $i < strlen($payload); $i++) {
            $unmasked .= $payload[$i] ^ $maskKey[$i % 4];
        }
        return $unmasked;
    }
    return $payload;
}
}

// ── 4. DB UPDATE ─────────────────────────────────────────────
if (!function_exists('save_to_db')) {
function save_to_db(PDO $pdo, array $memory): void {
    foreach ($memory as $symbol => $data) {
        if (empty($data['bids']) && empty($data['asks'])) continue;
        try {
            $stmt = $pdo->prepare("
                UPDATE daemon_bybit_Ticker
                SET
                    bids = :bids,
                    asks = :asks
                WHERE symbol = :symbol
            ");
            $stmt->execute([
                ':bids'   => json_encode($data['bids'], JSON_UNESCAPED_UNICODE),
                ':asks'   => json_encode($data['asks'], JSON_UNESCAPED_UNICODE),
                ':symbol' => $symbol,
            ]);
        } catch (Throwable $e) {
            echo "[" . date('H:i:s') . "] DB 오류 [{$symbol}]: " . $e->getMessage() . "\n";
        }
    }
}
}

// ── 5. 메인 로직 (run_striker 함수로 감싸기) ──────────────────
if (!function_exists('run_striker')) {
function run_striker(PDO $pdo, array $clean_symbols, string $WS_URL): void {

    if (empty($clean_symbols)) {
        echo "[" . date('H:i:s') . "] 활성 심볼 없음. 타격기 종료.\n";
        return;
    }

    $sock      = null;
    $memory    = [];
    $lastSaved = 0;

    try {
        $sock = ws_connect($WS_URL);
        echo "[" . date('H:i:s') . "] 웹소켓 연결 성공\n";

        // 구독 요청 (orderbook depth 200)
        $topics = array_map(fn($s) => "orderbook.200.{$s}", $clean_symbols);
        $subMsg = json_encode([
            'op'   => 'subscribe',
            'args' => $topics,
        ]);
        ws_send($sock, $subMsg);
        echo "[" . date('H:i:s') . "] 구독 요청: " . implode(', ', $clean_symbols) . "\n";

        // ── 메인 루프 ──────────────────────────────────────────
        while (true) {

            $raw = ws_read($sock);

            if ($raw !== null && $raw !== '') {
                $msg = json_decode($raw, true);

                if (!is_array($msg)) goto save_check;

                // ping/pong 처리
                if (isset($msg['op']) && $msg['op'] === 'ping') {
                    ws_send($sock, json_encode(['op' => 'pong']));
                    goto save_check;
                }

                // 데이터 메시지 처리
                $type   = $msg['type']            ?? null;
                $symbol = $msg['data']['s']        ?? ($msg['data']['symbol'] ?? null);

                if ($symbol === null) goto save_check;

                $bids = $msg['data']['b'] ?? $msg['data']['bids'] ?? null;
                $asks = $msg['data']['a'] ?? $msg['data']['asks'] ?? null;

                if ($type === 'snapshot') {
                    if ($bids !== null) $memory[$symbol]['bids'] = $bids;
                    if ($asks !== null) $memory[$symbol]['asks'] = $asks;

                } elseif ($type === 'delta') {
                    if ($bids !== null) {
                        foreach ($bids as $bid) {
                            $price = $bid[0];
                            if ($bid[1] == '0') {
                                $memory[$symbol]['bids'] = array_values(
                                    array_filter($memory[$symbol]['bids'] ?? [], fn($b) => $b[0] !== $price)
                                );
                            } else {
                                $found = false;
                                foreach ($memory[$symbol]['bids'] as &$b) {
                                    if ($b[0] === $price) { $b[1] = $bid[1]; $found = true; break; }
                                }
                                unset($b);
                                if (!$found) $memory[$symbol]['bids'][] = $bid;
                            }
                        }
                    }
                    if ($asks !== null) {
                        foreach ($asks as $ask) {
                            $price = $ask[0];
                            if ($ask[1] == '0') {
                                $memory[$symbol]['asks'] = array_values(
                                    array_filter($memory[$symbol]['asks'] ?? [], fn($a) => $a[0] !== $price)
                                );
                            } else {
                                $found = false;
                                foreach ($memory[$symbol]['asks'] as &$a) {
                                    if ($a[0] === $price) { $a[1] = $ask[1]; $found = true; break; }
                                }
                                unset($a);
                                if (!$found) $memory[$symbol]['asks'][] = $ask;
                            }
                        }
                    }
                }
            }

            // ── DB 저장 체크 (0.9초마다) ───────────────────────
            save_check:
            $now = microtime(true);
            if ($now - $lastSaved >= 0.9) {
                save_to_db($pdo, $memory);
                $lastSaved = $now;
            }

        } // end while

    } catch (Throwable $e) {
        echo "[" . date('H:i:s') . "] 타격기 오류: " . $e->getMessage() . "\n";
        if ($sock) fclose($sock);
        sleep(3);
    }
}
}