using System.Collections.Concurrent; using System.Diagnostics; using System.Text.Json; using System.Text.Json.Serialization; using Poe2Trade.Core; using Serilog; namespace Poe2Trade.Trade; public class TradeDaemonBridge : ITradeMonitor { private static readonly JsonSerializerOptions JsonOpts = new() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase, DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, }; private Process? _proc; private int _reqCounter; private readonly ConcurrentDictionary> _pending = new(); private readonly SavedSettings _config; private readonly string _daemonScript; private readonly string _nodeExe; public event Action>? NewListings; public TradeDaemonBridge(SavedSettings config) { _config = config; _daemonScript = Path.GetFullPath(Path.Combine("tools", "trade-daemon", "daemon.mjs")); _nodeExe = "node"; } public async Task Start(string? dashboardUrl = null) { EnsureDaemonRunning(); var userDataDir = Path.GetFullPath(_config.BrowserUserDataDir); await SendCommand("start", new { browserUserDataDir = userDataDir, headless = _config.Headless, dashboardUrl, }); Log.Information("Trade daemon browser started"); } public async Task AddSearch(string tradeUrl) { EnsureDaemonRunning(); await SendCommand("addSearch", new { url = tradeUrl }); } public async Task PauseSearch(string searchId) { EnsureDaemonRunning(); await SendCommand("pauseSearch", new { searchId }); } public async Task ClickTravelToHideout(string pageId, string? itemId = null) { EnsureDaemonRunning(); var resp = await SendCommand("clickTravel", new { pageId, itemId }); return resp.TryGetProperty("clicked", out var c) && c.GetBoolean(); } public async Task<(string ScrapId, List Items)> OpenScrapPage(string tradeUrl) { EnsureDaemonRunning(); var resp = await SendCommand("openScrapPage", new { url = tradeUrl }); var scrapId = resp.GetProperty("scrapId").GetString()!; var items = ParseItems(resp); return (scrapId, items); } public async Task> ReloadScrapPage(string scrapId) { EnsureDaemonRunning(); var resp = await SendCommand("reloadScrapPage", new { scrapId }); return ParseItems(resp); } public async Task CloseScrapPage(string scrapId) { EnsureDaemonRunning(); await SendCommand("closeScrapPage", new { scrapId }); } public string ExtractSearchId(string url) { var cleaned = System.Text.RegularExpressions.Regex.Replace(url, @"/live/?$", ""); var parts = cleaned.Split('/'); return parts.Length > 0 ? parts[^1] : url; } public async ValueTask DisposeAsync() { if (_proc != null && !_proc.HasExited) { try { // Send stop command (best effort) var reqId = Interlocked.Increment(ref _reqCounter); var msg = JsonSerializer.Serialize(new { reqId, cmd = "stop" }, JsonOpts); await _proc.StandardInput.WriteLineAsync(msg); await _proc.StandardInput.FlushAsync(); _proc.WaitForExit(5000); } catch { /* ignore */ } if (_proc != null && !_proc.HasExited) { try { _proc.Kill(); } catch { /* ignore */ } } } _proc?.Dispose(); _proc = null; // Complete any pending requests foreach (var kv in _pending) { kv.Value.TrySetCanceled(); _pending.TryRemove(kv.Key, out _); } Log.Information("Trade daemon stopped"); } private async Task SendCommand(string cmd, object? parameters = null) { if (_proc == null || _proc.HasExited) throw new InvalidOperationException("Trade daemon is not running"); var reqId = Interlocked.Increment(ref _reqCounter); var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _pending[reqId] = tcs; // Build command object: merge reqId + cmd + params var dict = new Dictionary { ["reqId"] = reqId, ["cmd"] = cmd }; if (parameters != null) { var paramJson = JsonSerializer.SerializeToElement(parameters, JsonOpts); foreach (var prop in paramJson.EnumerateObject()) dict[prop.Name] = prop.Value; } var json = JsonSerializer.Serialize(dict, JsonOpts); await _proc.StandardInput.WriteLineAsync(json); await _proc.StandardInput.FlushAsync(); // Await response with timeout using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); cts.Token.Register(() => tcs.TrySetCanceled()); try { return await tcs.Task; } finally { _pending.TryRemove(reqId, out _); } } private void EnsureDaemonRunning() { if (_proc != null && !_proc.HasExited) return; _proc?.Dispose(); _proc = null; if (!File.Exists(_daemonScript)) throw new FileNotFoundException($"Trade daemon not found at {_daemonScript}"); Log.Information("Spawning trade daemon: {Node} {Script}", _nodeExe, _daemonScript); var proc = new Process { StartInfo = new ProcessStartInfo { FileName = _nodeExe, Arguments = $"\"{_daemonScript}\"", UseShellExecute = false, RedirectStandardInput = true, RedirectStandardOutput = true, RedirectStandardError = true, CreateNoWindow = true, } }; proc.ErrorDataReceived += (_, e) => { if (!string.IsNullOrEmpty(e.Data)) Log.Debug("[trade-daemon] {Line}", e.Data); }; try { proc.Start(); proc.BeginErrorReadLine(); // Wait for ready signal (up to 15s) var readyTask = Task.Run(() => proc.StandardOutput.ReadLine()); if (!readyTask.Wait(TimeSpan.FromSeconds(15))) throw new TimeoutException("Trade daemon did not send ready signal within 15s"); var readyLine = readyTask.Result ?? throw new Exception("Trade daemon exited before ready signal"); var readyDoc = JsonDocument.Parse(readyLine); if (!readyDoc.RootElement.TryGetProperty("type", out var typeProp) || typeProp.GetString() != "ready") throw new Exception($"Trade daemon did not send ready signal: {readyLine}"); } catch { try { if (!proc.HasExited) proc.Kill(); } catch { /* best effort */ } proc.Dispose(); throw; } _proc = proc; // Start background reader thread _ = Task.Run(() => ReadLoop(proc)); Log.Information("Trade daemon ready"); } private void ReadLoop(Process proc) { try { while (!proc.HasExited) { var line = proc.StandardOutput.ReadLine(); if (line == null) break; try { using var doc = JsonDocument.Parse(line); var root = doc.RootElement; var type = root.GetProperty("type").GetString(); if (type == "response") { var reqId = root.GetProperty("reqId").GetInt32(); if (_pending.TryGetValue(reqId, out var tcs)) { var ok = root.GetProperty("ok").GetBoolean(); if (ok) tcs.TrySetResult(root.Clone()); else { var error = root.TryGetProperty("error", out var e) ? e.GetString() ?? "Unknown error" : "Unknown error"; tcs.TrySetException(new Exception($"Trade daemon error: {error}")); } } } else if (type == "event") { HandleEvent(root); } } catch (Exception ex) { Log.Debug("Failed to parse daemon output: {Line} - {Error}", line, ex.Message); } } } catch (Exception ex) { Log.Warning(ex, "Trade daemon read loop ended"); } // Daemon exited — fail all pending requests foreach (var kv in _pending) { kv.Value.TrySetException(new Exception("Trade daemon process exited")); _pending.TryRemove(kv.Key, out _); } } private void HandleEvent(JsonElement root) { var eventName = root.GetProperty("event").GetString(); switch (eventName) { case "newListings": var searchId = root.GetProperty("searchId").GetString()!; var itemIds = root.GetProperty("itemIds").EnumerateArray() .Select(e => e.GetString()!) .Where(s => s != null) .ToList(); if (itemIds.Count > 0) { Log.Information("New listings from daemon: {SearchId} ({Count} items)", searchId, itemIds.Count); NewListings?.Invoke(searchId, itemIds); } break; case "wsClose": var closedId = root.GetProperty("searchId").GetString()!; Log.Warning("WebSocket closed (daemon): {SearchId}", closedId); break; default: Log.Debug("Unknown daemon event: {Event}", eventName); break; } } private static List ParseItems(JsonElement resp) { var items = new List(); if (resp.TryGetProperty("items", out var arr) && arr.ValueKind == JsonValueKind.Array) { foreach (var el in arr.EnumerateArray()) { items.Add(new TradeItem( el.GetProperty("id").GetString() ?? "", el.TryGetProperty("w", out var w) ? w.GetInt32() : 1, el.TryGetProperty("h", out var h) ? h.GetInt32() : 1, el.TryGetProperty("stashX", out var sx) ? sx.GetInt32() : 0, el.TryGetProperty("stashY", out var sy) ? sy.GetInt32() : 0, el.TryGetProperty("account", out var acc) ? acc.GetString() ?? "" : "" )); } } return items; } }