summaryrefslogtreecommitdiffstats
path: root/src/jaegertracing/thrift/lib/dart/test/transport/t_socket_transport_test.dart
diff options
context:
space:
mode:
Diffstat (limited to 'src/jaegertracing/thrift/lib/dart/test/transport/t_socket_transport_test.dart')
-rw-r--r--src/jaegertracing/thrift/lib/dart/test/transport/t_socket_transport_test.dart311
1 files changed, 311 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/dart/test/transport/t_socket_transport_test.dart b/src/jaegertracing/thrift/lib/dart/test/transport/t_socket_transport_test.dart
new file mode 100644
index 000000000..90bffbe54
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/dart/test/transport/t_socket_transport_test.dart
@@ -0,0 +1,311 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+library thrift.test.transport.t_socket_transport_test;
+
+import 'dart:async';
+import 'dart:convert' show Utf8Codec;
+import 'dart:typed_data' show Uint8List;
+
+import 'package:dart2_constant/convert.dart' show base64;
+import 'package:dart2_constant/core.dart' as core;
+import 'package:mockito/mockito.dart';
+import 'package:test/test.dart';
+import 'package:thrift/thrift.dart';
+
+void main() {
+ const utf8Codec = const Utf8Codec();
+
+ final requestText = 'my test request';
+ final requestBytes = new Uint8List.fromList(utf8Codec.encode(requestText));
+ final requestBase64 = base64.encode(requestBytes);
+
+ final responseText = 'response 1';
+ final responseBytes = new Uint8List.fromList(utf8Codec.encode(responseText));
+ final responseBase64 = base64.encode(responseBytes);
+
+ final framedResponseBase64 = base64.encode(_getFramedResponse(responseBytes));
+
+ group('TClientSocketTransport', () {
+ FakeSocket socket;
+ TTransport transport;
+
+ setUp(() async {
+ socket = new FakeSocket(sync: false);
+ await socket.open();
+ transport = new TClientSocketTransport(socket);
+ await transport.open();
+ transport.writeAll(requestBytes);
+ });
+
+ test('Test client sending data over transport', () async {
+ expect(socket.sendPayload, isNull);
+
+ Future responseReady = transport.flush();
+
+ // allow microtask events to finish
+ await new Future.value();
+
+ expect(socket.sendPayload, isNotNull);
+ expect(socket.sendPayload, requestBytes);
+
+ // simulate a response
+ socket.receiveFakeMessage(responseBase64);
+
+ await responseReady;
+ var buffer = new Uint8List(responseBytes.length);
+ transport.readAll(buffer, 0, responseBytes.length);
+ var bufferText = utf8Codec.decode(buffer);
+
+ expect(bufferText, responseText);
+ });
+ }, timeout: new Timeout(new Duration(seconds: 1)));
+
+ group('TClientSocketTransport with FramedTransport', () {
+ FakeSocket socket;
+ TTransport transport;
+
+ setUp(() async {
+ socket = new FakeSocket(sync: true);
+ await socket.open();
+
+ transport = new TFramedTransport(new TClientSocketTransport(socket));
+ await transport.open();
+ transport.writeAll(requestBytes);
+ });
+
+ test('Test client sending data over framed transport', () async {
+ String bufferText;
+
+ Future responseReady = transport.flush().then((_) {
+ var buffer = new Uint8List(responseBytes.length);
+ transport.readAll(buffer, 0, responseBytes.length);
+ bufferText = utf8Codec.decode(buffer);
+ });
+
+ // simulate a response
+ socket.receiveFakeMessage(framedResponseBase64);
+
+ await responseReady;
+ expect(bufferText, responseText);
+ });
+ }, timeout: new Timeout(new Duration(seconds: 1)));
+
+ group('TAsyncClientSocketTransport', () {
+ FakeSocket socket;
+ FakeProtocolFactory protocolFactory;
+ TTransport transport;
+
+ setUp(() async {
+ socket = new FakeSocket(sync: true);
+ await socket.open();
+
+ protocolFactory = new FakeProtocolFactory();
+ protocolFactory.message = new TMessage('foo', TMessageType.CALL, 123);
+ transport = new TAsyncClientSocketTransport(
+ socket, new TMessageReader(protocolFactory),
+ responseTimeout: core.Duration.zero);
+ await transport.open();
+ transport.writeAll(requestBytes);
+ });
+
+ test('Test response correlates to correct request', () async {
+ String bufferText;
+
+ Future responseReady = transport.flush().then((_) {
+ var buffer = new Uint8List(responseBytes.length);
+ transport.readAll(buffer, 0, responseBytes.length);
+ bufferText = utf8Codec.decode(buffer);
+ });
+
+ // simulate a response
+ protocolFactory.message = new TMessage('foo', TMessageType.REPLY, 123);
+ socket.receiveFakeMessage(responseBase64);
+
+ // simulate a second response
+ var response2Text = 'response 2';
+ var response2Bytes =
+ new Uint8List.fromList(utf8Codec.encode(response2Text));
+ var response2Base64 = base64.encode(response2Bytes);
+ protocolFactory.message = new TMessage('foo2', TMessageType.REPLY, 124);
+ socket.receiveFakeMessage(response2Base64);
+
+ await responseReady;
+ expect(bufferText, responseText);
+ });
+
+ test('Test response timeout', () async {
+ Future responseReady = transport.flush();
+ expect(responseReady, throwsA(new isInstanceOf<TimeoutException>()));
+ });
+ }, timeout: new Timeout(new Duration(seconds: 1)));
+
+ group('TAsyncClientSocketTransport with TFramedTransport', () {
+ FakeSocket socket;
+ FakeProtocolFactory protocolFactory;
+ TTransport transport;
+
+ setUp(() async {
+ socket = new FakeSocket(sync: true);
+ await socket.open();
+
+ protocolFactory = new FakeProtocolFactory();
+ protocolFactory.message = new TMessage('foo', TMessageType.CALL, 123);
+ var messageReader = new TMessageReader(protocolFactory,
+ byteOffset: TFramedTransport.headerByteCount);
+
+ transport = new TFramedTransport(new TAsyncClientSocketTransport(
+ socket, messageReader,
+ responseTimeout: core.Duration.zero));
+ await transport.open();
+ transport.writeAll(requestBytes);
+ });
+
+ test('Test async client sending data over framed transport', () async {
+ String bufferText;
+
+ Future responseReady = transport.flush().then((_) {
+ var buffer = new Uint8List(responseBytes.length);
+ transport.readAll(buffer, 0, responseBytes.length);
+ bufferText = utf8Codec.decode(buffer);
+ });
+
+ // simulate a response
+ protocolFactory.message = new TMessage('foo', TMessageType.REPLY, 123);
+ socket.receiveFakeMessage(framedResponseBase64);
+
+ await responseReady;
+ expect(bufferText, responseText);
+ });
+ }, timeout: new Timeout(new Duration(seconds: 1)));
+
+ group('TServerTransport', () {
+ test('Test server transport listens to socket', () async {
+ var socket = new FakeSocket();
+ await socket.open();
+ expect(socket.isOpen, isTrue);
+
+ var transport = new TServerSocketTransport(socket);
+ expect(transport.hasReadData, isFalse);
+
+ socket.receiveFakeMessage(requestBase64);
+
+ // allow microtask events to finish
+ await new Future.value();
+
+ expect(transport.hasReadData, isTrue);
+
+ var buffer = new Uint8List(requestBytes.length);
+ transport.readAll(buffer, 0, requestBytes.length);
+
+ var bufferText = utf8Codec.decode(buffer);
+ expect(bufferText, requestText);
+ });
+
+ test('Test server sending data over transport', () async {
+ var socket = new FakeSocket();
+ await socket.open();
+
+ var transport = new TServerSocketTransport(socket);
+
+ transport.writeAll(responseBytes);
+ expect(socket.sendPayload, isNull);
+
+ transport.flush();
+
+ // allow microtask events to finish
+ await new Future.value();
+
+ expect(socket.sendPayload, isNotNull);
+ expect(socket.sendPayload, responseBytes);
+ });
+ }, timeout: new Timeout(new Duration(seconds: 1)));
+}
+
+class FakeSocket extends TSocket {
+ final StreamController<TSocketState> _onStateController;
+ Stream<TSocketState> get onState => _onStateController.stream;
+
+ final StreamController<Object> _onErrorController;
+ Stream<Object> get onError => _onErrorController.stream;
+
+ final StreamController<Uint8List> _onMessageController;
+ Stream<Uint8List> get onMessage => _onMessageController.stream;
+
+ FakeSocket({bool sync: false})
+ : _onStateController = new StreamController.broadcast(sync: sync),
+ _onErrorController = new StreamController.broadcast(sync: sync),
+ _onMessageController = new StreamController.broadcast(sync: sync);
+
+ bool _isOpen;
+
+ bool get isOpen => _isOpen;
+
+ bool get isClosed => !isOpen;
+
+ Future open() async {
+ _isOpen = true;
+ _onStateController.add(TSocketState.OPEN);
+ }
+
+ Future close() async {
+ _isOpen = false;
+ _onStateController.add(TSocketState.CLOSED);
+ }
+
+ Uint8List _sendPayload;
+ Uint8List get sendPayload => _sendPayload;
+
+ void send(Uint8List data) {
+ if (!isOpen) throw new StateError('The socket is not open');
+
+ _sendPayload = data;
+ }
+
+ void receiveFakeMessage(String base64text) {
+ if (!isOpen) throw new StateError('The socket is not open');
+
+ var message = new Uint8List.fromList(base64.decode(base64text));
+ _onMessageController.add(message);
+ }
+}
+
+class FakeProtocolFactory implements TProtocolFactory {
+ FakeProtocolFactory();
+
+ TMessage message;
+
+ getProtocol(TTransport transport) => new FakeProtocol(message);
+}
+
+class FakeProtocol extends Mock implements TProtocol {
+ FakeProtocol(this._message);
+
+ TMessage _message;
+
+ readMessageBegin() => _message;
+}
+
+Uint8List _getFramedResponse(Uint8List responseBytes) {
+ var byteOffset = TFramedTransport.headerByteCount;
+ var response = new Uint8List(byteOffset + responseBytes.length);
+
+ response.buffer.asByteData().setInt32(0, responseBytes.length);
+ response.setAll(byteOffset, responseBytes);
+
+ return response;
+}