// Copyright 2014 The Flutter Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. import 'dart:async'; import 'base/common.dart'; import 'base/io.dart'; import 'base/logger.dart'; import 'base/utils.dart'; import 'convert.dart'; /// Parse binary streams in the JSON RPC format understood by the daemon, and /// convert it into a stream of JSON RPC messages. Stream> _convertInputStream(Stream> inputStream) { return utf8.decoder.bind(inputStream) .transform(const LineSplitter()) .where((String line) => line.startsWith('[{') && line.endsWith('}]')) .map?>((String line) { line = line.substring(1, line.length - 1); return castStringKeyedMap(json.decode(line)); }) .where((Map? entry) => entry != null) .cast>(); } /// A stream that a [DaemonConnection] uses to communicate with each other. abstract class DaemonStreams { /// Stream that contains input to the [DaemonConnection]. Stream> get inputStream; /// Outputs a message through the connection. void send(Map message); /// Cleans up any resources used. Future dispose() async { } } /// A [DaemonStream] that uses stdin and stdout as the underlying streams. class StdioDaemonStreams extends DaemonStreams { StdioDaemonStreams(Stdio stdio) : _stdio = stdio, inputStream = _convertInputStream(stdio.stdin); final Stdio _stdio; @override final Stream> inputStream; @override void send(Map message) { _stdio.stdoutWrite( '[${json.encode(message)}]\n', fallback: (String message, Object? error, StackTrace stack) { throwToolExit('Failed to write daemon command response to stdout: $error'); }, ); } } /// A [DaemonStream] that uses [Socket] as the underlying stream. class TcpDaemonStreams extends DaemonStreams { /// Creates a [DaemonStreams] with an existing [Socket]. TcpDaemonStreams( Socket socket, { required Logger logger, }): _logger = logger { _socket = Future.value(_initializeSocket(socket)); } /// Connects to a remote host and creates a [DaemonStreams] from the socket. TcpDaemonStreams.connect( String host, int port, { required Logger logger, }) : _logger = logger { _socket = Socket.connect(host, port).then(_initializeSocket); } late final Future _socket; final StreamController> _commands = StreamController>(); final Logger _logger; @override Stream> get inputStream => _commands.stream; @override void send(Map message) { _socket.then((Socket socket) { try { socket.write('[${json.encode(message)}]\n'); } on SocketException catch (error) { _logger.printError('Failed to write daemon command response to socket: $error'); // Failed to send, close the connection socket.close(); } }); } Socket _initializeSocket(Socket socket) { _commands.addStream(_convertInputStream(socket)); return socket; } @override Future dispose() async { await (await _socket).close(); } } /// Connection between a flutter daemon and a client. class DaemonConnection { DaemonConnection({ required DaemonStreams daemonStreams, required Logger logger, }): _logger = logger, _daemonStreams = daemonStreams { _commandSubscription = daemonStreams.inputStream.listen( _handleData, onError: (Object error, StackTrace stackTrace) { // We have to listen for on error otherwise the error on the socket // will end up in the Zone error handler. // Do nothing here and let the stream close handlers handle shutting // down the daemon. } ); } final DaemonStreams _daemonStreams; final Logger _logger; late final StreamSubscription> _commandSubscription; int _outgoingRequestId = 0; final Map> _outgoingRequestCompleters = >{}; final StreamController> _events = StreamController>.broadcast(); final StreamController> _incomingCommands = StreamController>(); /// A stream that contains all the incoming requests. Stream> get incomingCommands => _incomingCommands.stream; /// Listens to the event with the event name [eventToListen]. Stream listenToEvent(String eventToListen) { return _events.stream .where((Map event) => event['event'] == eventToListen) .map((Map event) => event['params']); } /// Sends a request to the other end of the connection. /// /// Returns a [Future] that resolves with the content. Future sendRequest(String method, [Object? params]) async { final String id = '${++_outgoingRequestId}'; final Completer completer = Completer(); _outgoingRequestCompleters[id] = completer; final Map data = { 'id': id, 'method': method, if (params != null) 'params': params, }; _logger.printTrace('-> Sending to daemon, id = $id, method = $method'); _daemonStreams.send(data); return completer.future; } /// Sends a response to the other end of the connection. void sendResponse(Object id, [Object? result]) { _daemonStreams.send({ 'id': id, if (result != null) 'result': result, }); } /// Sends an error response to the other end of the connection. void sendErrorResponse(Object id, Object error, StackTrace trace) { _daemonStreams.send({ 'id': id, 'error': error, 'trace': '$trace', }); } /// Sends an event to the client. void sendEvent(String name, [ Object? params ]) { _daemonStreams.send({ 'event': name, if (params != null) 'params': params, }); } /// Handles the input from the stream. /// /// There are three kinds of data: Request, Response, Event. /// /// Request: /// {"id": . "method": , "params": } /// /// Response: /// {"id": . "result": } for a successful response. /// {"id": . "error": , "stackTrace": } for an error response. /// /// Event: /// {"event": . "params": } void _handleData(Map data) { if (data['id'] != null) { if (data['method'] == null) { // This is a response to previously sent request. final String id = data['id']! as String; if (data['error'] != null) { // This is an error response. _logger.printTrace('<- Error response received from daemon, id = $id'); final Object error = data['error']!; final String stackTrace = data['stackTrace'] as String? ?? ''; _outgoingRequestCompleters.remove(id)?.completeError(error, StackTrace.fromString(stackTrace)); } else { _logger.printTrace('<- Response received from daemon, id = $id'); final Object? result = data['result']; _outgoingRequestCompleters.remove(id)?.complete(result); } } else { _incomingCommands.add(data); } } else if (data['event'] != null) { // This is an event _logger.printTrace('<- Event received: ${data['event']}'); _events.add(data); } else { _logger.printError('Unknown data received from daemon'); } } /// Cleans up any resources used in the connection. Future dispose() async { await _commandSubscription.cancel(); await _daemonStreams.dispose(); unawaited(_events.close()); unawaited(_incomingCommands.close()); } }