<?php
// ============================================================
// 타격기 : bybit_total_striker.php
// 웹소켓 연결 → 메모리 조립 → 0.9초마다 DB 저장
// 환경기에서 받는 값 : $pdo, $clean_symbols, $WS_URL
// ============================================================
// ── 1. 웹소켓 핸드셰이크 연결 ────────────────────────────────
if (!function_exists('ws_connect')) {
function ws_connect(string $url): mixed {
$parsed = parse_url($url);
$host = $parsed['host'];
$path = ($parsed['path'] ?? '/') . (isset($parsed['query']) ? '?' . $parsed['query'] : '');
$port = $parsed['port'] ?? 443;
$ctx = stream_context_create(['ssl' => ['verify_peer' => false, 'verify_peer_name' => false]]);
$sock = stream_socket_client("ssl://{$host}:{$port}", $errno, $errstr, 10, STREAM_CLIENT_CONNECT, $ctx);
if (!$sock) throw new RuntimeException("소켓 연결 실패: {$errstr} ({$errno})");
stream_set_timeout($sock, 0, 100000); // 0.1초 타임아웃 (논블로킹용)
// HTTP 업그레이드 요청
$key = base64_encode(random_bytes(16));
$handshake = "GET {$path} HTTP/1.1\r\n";
$handshake .= "Host: {$host}\r\n";
$handshake .= "Upgrade: websocket\r\n";
$handshake .= "Connection: Upgrade\r\n";
$handshake .= "Sec-WebSocket-Key: {$key}\r\n";
$handshake .= "Sec-WebSocket-Version: 13\r\n\r\n";
fwrite($sock, $handshake);
// 응답 확인
$response = '';
while (!feof($sock)) {
$response .= fread($sock, 4096);
if (strpos($response, "\r\n\r\n") !== false) break;
}
if (strpos($response, '101') === false) {
throw new RuntimeException("핸드셰이크 실패: " . substr($response, 0, 200));
}
return $sock;
}
}
// ── 2. 웹소켓 프레임 전송 ────────────────────────────────────
if (!function_exists('ws_send')) {
function ws_send(mixed $sock, string $data): void {
$len = strlen($data);
$mask = random_bytes(4);
$masked = '';
for ($i = 0; $i < $len; $i++) {
$masked .= $data[$i] ^ $mask[$i % 4];
}
$header = "\x81"; // FIN + text frame
if ($len < 126) {
$header .= chr(0x80 | $len);
} elseif ($len < 65536) {
$header .= "\xFE" . pack('n', $len);
} else {
$header .= "\xFF" . pack('J', $len);
}
fwrite($sock, $header . $mask . $masked);
}
}
// ── 3. 웹소켓 프레임 수신 ────────────────────────────────────
if (!function_exists('ws_read')) {
function ws_read(mixed $sock): ?string {
$head = fread($sock, 2);
if ($head === false || strlen($head) < 2) return null;
$byte2 = ord($head[1]);
$masked = ($byte2 & 0x80) !== 0;
$payloadLen = $byte2 & 0x7F;
if ($payloadLen === 126) {
$ext = fread($sock, 2);
$payloadLen = unpack('n', $ext)[1];
} elseif ($payloadLen === 127) {
$ext = fread($sock, 8);
$payloadLen = unpack('J', $ext)[1];
}
$maskKey = $masked ? fread($sock, 4) : '';
$payload = '';
$remaining = $payloadLen;
while ($remaining > 0) {
$chunk = fread($sock, min($remaining, 8192));
if ($chunk === false || $chunk === '') break;
$payload .= $chunk;
$remaining -= strlen($chunk);
}
if ($masked) {
$unmasked = '';
for ($i = 0; $i < strlen($payload); $i++) {
$unmasked .= $payload[$i] ^ $maskKey[$i % 4];
}
return $unmasked;
}
return $payload;
}
}
// ── 4. DB UPDATE ─────────────────────────────────────────────
if (!function_exists('save_to_db')) {
function save_to_db(PDO $pdo, array $memory): void {
foreach ($memory as $symbol => $data) {
if (empty($data['bids']) && empty($data['asks'])) continue;
try {
$stmt = $pdo->prepare("
UPDATE daemon_bybit_Ticker
SET
bids = :bids,
asks = :asks
WHERE symbol = :symbol
");
$stmt->execute([
':bids' => json_encode($data['bids'], JSON_UNESCAPED_UNICODE),
':asks' => json_encode($data['asks'], JSON_UNESCAPED_UNICODE),
':symbol' => $symbol,
]);
} catch (Throwable $e) {
echo "[" . date('H:i:s') . "] DB 오류 [{$symbol}]: " . $e->getMessage() . "\n";
}
}
}
}
// ── 5. 메인 로직 (run_striker 함수로 감싸기) ──────────────────
if (!function_exists('run_striker')) {
function run_striker(PDO $pdo, array $clean_symbols, string $WS_URL): void {
if (empty($clean_symbols)) {
echo "[" . date('H:i:s') . "] 활성 심볼 없음. 타격기 종료.\n";
return;
}
$sock = null;
$memory = [];
$lastSaved = 0;
try {
$sock = ws_connect($WS_URL);
echo "[" . date('H:i:s') . "] 웹소켓 연결 성공\n";
// 구독 요청 (orderbook depth 200)
$topics = array_map(fn($s) => "orderbook.200.{$s}", $clean_symbols);
$subMsg = json_encode([
'op' => 'subscribe',
'args' => $topics,
]);
ws_send($sock, $subMsg);
echo "[" . date('H:i:s') . "] 구독 요청: " . implode(', ', $clean_symbols) . "\n";
// ── 메인 루프 ──────────────────────────────────────────
while (true) {
$raw = ws_read($sock);
if ($raw !== null && $raw !== '') {
$msg = json_decode($raw, true);
if (!is_array($msg)) goto save_check;
// ping/pong 처리
if (isset($msg['op']) && $msg['op'] === 'ping') {
ws_send($sock, json_encode(['op' => 'pong']));
goto save_check;
}
// 데이터 메시지 처리
$type = $msg['type'] ?? null;
$symbol = $msg['data']['s'] ?? ($msg['data']['symbol'] ?? null);
if ($symbol === null) goto save_check;
$bids = $msg['data']['b'] ?? $msg['data']['bids'] ?? null;
$asks = $msg['data']['a'] ?? $msg['data']['asks'] ?? null;
if ($type === 'snapshot') {
if ($bids !== null) $memory[$symbol]['bids'] = $bids;
if ($asks !== null) $memory[$symbol]['asks'] = $asks;
} elseif ($type === 'delta') {
if ($bids !== null) {
foreach ($bids as $bid) {
$price = $bid[0];
if ($bid[1] == '0') {
$memory[$symbol]['bids'] = array_values(
array_filter($memory[$symbol]['bids'] ?? [], fn($b) => $b[0] !== $price)
);
} else {
$found = false;
foreach ($memory[$symbol]['bids'] as &$b) {
if ($b[0] === $price) { $b[1] = $bid[1]; $found = true; break; }
}
unset($b);
if (!$found) $memory[$symbol]['bids'][] = $bid;
}
}
}
if ($asks !== null) {
foreach ($asks as $ask) {
$price = $ask[0];
if ($ask[1] == '0') {
$memory[$symbol]['asks'] = array_values(
array_filter($memory[$symbol]['asks'] ?? [], fn($a) => $a[0] !== $price)
);
} else {
$found = false;
foreach ($memory[$symbol]['asks'] as &$a) {
if ($a[0] === $price) { $a[1] = $ask[1]; $found = true; break; }
}
unset($a);
if (!$found) $memory[$symbol]['asks'][] = $ask;
}
}
}
}
}
// ── DB 저장 체크 (0.9초마다) ───────────────────────
save_check:
$now = microtime(true);
if ($now - $lastSaved >= 0.9) {
save_to_db($pdo, $memory);
$lastSaved = $now;
}
} // end while
} catch (Throwable $e) {
echo "[" . date('H:i:s') . "] 타격기 오류: " . $e->getMessage() . "\n";
if ($sock) fclose($sock);
sleep(3);
}
}
}