From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- .../test/transport/t_framed_transport_test.dart | 175 ++++++++++++ .../dart/test/transport/t_http_transport_test.dart | 164 +++++++++++ .../test/transport/t_socket_transport_test.dart | 311 +++++++++++++++++++++ .../lib/dart/test/transport/t_transport_test.dart | 41 +++ 4 files changed, 691 insertions(+) create mode 100644 src/jaegertracing/thrift/lib/dart/test/transport/t_framed_transport_test.dart create mode 100644 src/jaegertracing/thrift/lib/dart/test/transport/t_http_transport_test.dart create mode 100644 src/jaegertracing/thrift/lib/dart/test/transport/t_socket_transport_test.dart create mode 100644 src/jaegertracing/thrift/lib/dart/test/transport/t_transport_test.dart (limited to 'src/jaegertracing/thrift/lib/dart/test/transport') diff --git a/src/jaegertracing/thrift/lib/dart/test/transport/t_framed_transport_test.dart b/src/jaegertracing/thrift/lib/dart/test/transport/t_framed_transport_test.dart new file mode 100644 index 000000000..7ab490539 --- /dev/null +++ b/src/jaegertracing/thrift/lib/dart/test/transport/t_framed_transport_test.dart @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +library thrift.test.transport.t_framed_transport_test; + +import 'dart:async'; +import 'dart:typed_data' show Uint8List; + +import 'package:dart2_constant/convert.dart' show utf8; +import 'package:test/test.dart'; +import 'package:thrift/thrift.dart'; + +void main() { + group('TFramedTransport partial reads', () { + final flushAwaitDuration = new Duration(seconds: 10); + + FakeReadOnlySocket socket; + TSocketTransport socketTransport; + TFramedTransport transport; + var messageAvailable; + + setUp(() { + socket = new FakeReadOnlySocket(); + socketTransport = new TClientSocketTransport(socket); + transport = new TFramedTransport(socketTransport); + messageAvailable = false; + }); + + expectNoReadableBytes() { + var readBuffer = new Uint8List(128); + var readBytes = transport.read(readBuffer, 0, readBuffer.lengthInBytes); + expect(readBytes, 0); + expect(messageAvailable, false); + } + + test('Test transport reads messages where header and body are sent separately', () async { + // buffer into which we'll read + var readBuffer = new Uint8List(10); + var readBytes; + + // registers for readable bytes + var flushFuture = transport.flush().timeout(flushAwaitDuration); + flushFuture.then((_) { + messageAvailable = true; + }); + + // write header bytes + socket.messageController.add(new Uint8List.fromList([0x00, 0x00, 0x00, 0x06])); + + // you shouldn't be able to get any bytes from the read, + // because the header has been consumed internally + expectNoReadableBytes(); + + // write first batch of body + socket.messageController.add(new Uint8List.fromList(utf8.encode("He"))); + + // you shouldn't be able to get any bytes from the read, + // because the frame has been consumed internally + expectNoReadableBytes(); + + // write second batch of body + socket.messageController.add(new Uint8List.fromList(utf8.encode("llo!"))); + + // have to wait for the flush to complete, + // because it's only then that the frame is available for reading + await flushFuture; + expect(messageAvailable, true); + + // at this point the frame is complete, so we expect the read to complete + readBytes = transport.read(readBuffer, 0, readBuffer.lengthInBytes); + expect(readBytes, 6); + expect(readBuffer.sublist(0, 6), utf8.encode("Hello!")); + }); + + test('Test transport reads messages where header is sent in pieces ' + 'and body is also sent in pieces', () async { + // buffer into which we'll read + var readBuffer = new Uint8List(10); + var readBytes; + + // registers for readable bytes + var flushFuture = transport.flush().timeout(flushAwaitDuration); + flushFuture.then((_) { + messageAvailable = true; + }); + + // write first part of header bytes + socket.messageController.add(new Uint8List.fromList([0x00, 0x00])); + + // you shouldn't be able to get any bytes from the read + expectNoReadableBytes(); + + // write second part of header bytes + socket.messageController.add(new Uint8List.fromList([0x00, 0x03])); + + // you shouldn't be able to get any bytes from the read again + // because only the header was read, and there's no frame body + readBytes = expectNoReadableBytes(); + + // write first batch of body + socket.messageController.add(new Uint8List.fromList(utf8.encode("H"))); + + // you shouldn't be able to get any bytes from the read, + // because the frame has been consumed internally + expectNoReadableBytes(); + + // write second batch of body + socket.messageController.add(new Uint8List.fromList(utf8.encode("i!"))); + + // have to wait for the flush to complete, + // because it's only then that the frame is available for reading + await flushFuture; + expect(messageAvailable, true); + + // at this point the frame is complete, so we expect the read to complete + readBytes = transport.read(readBuffer, 0, readBuffer.lengthInBytes); + expect(readBytes, 3); + expect(readBuffer.sublist(0, 3), utf8.encode("Hi!")); + }); + }); +} + + + +class FakeReadOnlySocket extends TSocket { + + StreamController messageController = new StreamController(sync: true); + StreamController errorController = new StreamController(); + StreamController stateController = new StreamController(); + + @override + Future close() { + // noop + } + + @override + bool get isClosed => false; + + @override + bool get isOpen => true; + + @override + Stream get onError => errorController.stream; + + @override + Stream get onMessage => messageController.stream; + + @override + Stream get onState => stateController.stream; + + @override + Future open() { + // noop + } + + @override + void send(Uint8List data) { + // noop + } +} + diff --git a/src/jaegertracing/thrift/lib/dart/test/transport/t_http_transport_test.dart b/src/jaegertracing/thrift/lib/dart/test/transport/t_http_transport_test.dart new file mode 100644 index 000000000..03ccede9a --- /dev/null +++ b/src/jaegertracing/thrift/lib/dart/test/transport/t_http_transport_test.dart @@ -0,0 +1,164 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +library thrift.test.transport.t_socket_transport_test; + +import 'dart:async'; +import 'dart:convert' show Encoding; +import 'dart:convert' show Utf8Codec; +import 'dart:typed_data' show Uint8List; + +import 'package:dart2_constant/convert.dart' show base64; +import 'package:http/http.dart' show BaseRequest; +import 'package:http/http.dart' show Client; +import 'package:http/http.dart' show Response; +import 'package:http/http.dart' show StreamedResponse; +import 'package:test/test.dart'; +import 'package:thrift/thrift.dart'; + +void main() { + const utf8Codec = const Utf8Codec(); + + group('THttpClientTransport', () { + FakeHttpClient client; + THttpClientTransport transport; + + setUp(() { + client = new FakeHttpClient(sync: false); + var config = new THttpConfig(Uri.parse('http://localhost'), {}); + transport = new THttpClientTransport(client, config); + }); + + test('Test transport sends body', () async { + var expectedText = 'my request'; + transport.writeAll(utf8Codec.encode(expectedText)); + + expect(client.postRequest, isEmpty); + + await transport.flush(); + + expect(client.postRequest, isNotEmpty); + + var requestText = utf8Codec.decode(base64.decode(client.postRequest)); + expect(requestText, expectedText); + }); + + test('Test transport receives response', () async { + var expectedText = 'my response'; + var expectedBytes = utf8Codec.encode(expectedText); + client.postResponse = base64.encode(expectedBytes); + + transport.writeAll(utf8Codec.encode('my request')); + expect(transport.hasReadData, isFalse); + + await transport.flush(); + + expect(transport.hasReadData, isTrue); + + var buffer = new Uint8List(expectedBytes.length); + transport.readAll(buffer, 0, expectedBytes.length); + + var bufferText = utf8Codec.decode(buffer); + expect(bufferText, expectedText); + }); + }); + + group('THttpClientTransport with multiple messages', () { + FakeHttpClient client; + THttpClientTransport transport; + + setUp(() { + client = new FakeHttpClient(sync: true); + var config = new THttpConfig(Uri.parse('http://localhost'), {}); + transport = new THttpClientTransport(client, config); + }); + + test('Test read correct buffer after flush', () async { + String bufferText; + var expectedText = 'response 1'; + var expectedBytes = utf8Codec.encode(expectedText); + + // prepare a response + transport.writeAll(utf8Codec.encode('request 1')); + client.postResponse = base64.encode(expectedBytes); + + Future responseReady = transport.flush().then((_) { + var buffer = new Uint8List(expectedBytes.length); + transport.readAll(buffer, 0, expectedBytes.length); + bufferText = utf8Codec.decode(buffer); + }); + + // prepare a second response + transport.writeAll(utf8Codec.encode('request 2')); + var response2Bytes = utf8Codec.encode('response 2'); + client.postResponse = base64.encode(response2Bytes); + await transport.flush(); + + await responseReady; + expect(bufferText, expectedText); + }); + }); +} + +class FakeHttpClient implements Client { + String postResponse = ''; + String postRequest = ''; + + final bool sync; + + FakeHttpClient({this.sync: false}); + + Future post(url, + {Map headers, body, Encoding encoding}) { + postRequest = body; + var response = new Response(postResponse, 200); + + if (sync) { + return new Future.sync(() => response); + } else { + return new Future.value(response); + } + } + + Future head(url, {Map headers}) => + throw new UnimplementedError(); + + Future get(url, {Map headers}) => + throw new UnimplementedError(); + + Future put(url, + {Map headers, body, Encoding encoding}) => + throw new UnimplementedError(); + + Future patch(url, + {Map headers, body, Encoding encoding}) => + throw new UnimplementedError(); + + Future delete(url, {Map headers}) => + throw new UnimplementedError(); + + Future read(url, {Map headers}) => + throw new UnimplementedError(); + + Future readBytes(url, {Map headers}) => + throw new UnimplementedError(); + + Future send(BaseRequest request) => + throw new UnimplementedError(); + + void close() => throw new UnimplementedError(); +} diff --git a/src/jaegertracing/thrift/lib/dart/test/transport/t_socket_transport_test.dart b/src/jaegertracing/thrift/lib/dart/test/transport/t_socket_transport_test.dart new file mode 100644 index 000000000..90bffbe54 --- /dev/null +++ b/src/jaegertracing/thrift/lib/dart/test/transport/t_socket_transport_test.dart @@ -0,0 +1,311 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +library thrift.test.transport.t_socket_transport_test; + +import 'dart:async'; +import 'dart:convert' show Utf8Codec; +import 'dart:typed_data' show Uint8List; + +import 'package:dart2_constant/convert.dart' show base64; +import 'package:dart2_constant/core.dart' as core; +import 'package:mockito/mockito.dart'; +import 'package:test/test.dart'; +import 'package:thrift/thrift.dart'; + +void main() { + const utf8Codec = const Utf8Codec(); + + final requestText = 'my test request'; + final requestBytes = new Uint8List.fromList(utf8Codec.encode(requestText)); + final requestBase64 = base64.encode(requestBytes); + + final responseText = 'response 1'; + final responseBytes = new Uint8List.fromList(utf8Codec.encode(responseText)); + final responseBase64 = base64.encode(responseBytes); + + final framedResponseBase64 = base64.encode(_getFramedResponse(responseBytes)); + + group('TClientSocketTransport', () { + FakeSocket socket; + TTransport transport; + + setUp(() async { + socket = new FakeSocket(sync: false); + await socket.open(); + transport = new TClientSocketTransport(socket); + await transport.open(); + transport.writeAll(requestBytes); + }); + + test('Test client sending data over transport', () async { + expect(socket.sendPayload, isNull); + + Future responseReady = transport.flush(); + + // allow microtask events to finish + await new Future.value(); + + expect(socket.sendPayload, isNotNull); + expect(socket.sendPayload, requestBytes); + + // simulate a response + socket.receiveFakeMessage(responseBase64); + + await responseReady; + var buffer = new Uint8List(responseBytes.length); + transport.readAll(buffer, 0, responseBytes.length); + var bufferText = utf8Codec.decode(buffer); + + expect(bufferText, responseText); + }); + }, timeout: new Timeout(new Duration(seconds: 1))); + + group('TClientSocketTransport with FramedTransport', () { + FakeSocket socket; + TTransport transport; + + setUp(() async { + socket = new FakeSocket(sync: true); + await socket.open(); + + transport = new TFramedTransport(new TClientSocketTransport(socket)); + await transport.open(); + transport.writeAll(requestBytes); + }); + + test('Test client sending data over framed transport', () async { + String bufferText; + + Future responseReady = transport.flush().then((_) { + var buffer = new Uint8List(responseBytes.length); + transport.readAll(buffer, 0, responseBytes.length); + bufferText = utf8Codec.decode(buffer); + }); + + // simulate a response + socket.receiveFakeMessage(framedResponseBase64); + + await responseReady; + expect(bufferText, responseText); + }); + }, timeout: new Timeout(new Duration(seconds: 1))); + + group('TAsyncClientSocketTransport', () { + FakeSocket socket; + FakeProtocolFactory protocolFactory; + TTransport transport; + + setUp(() async { + socket = new FakeSocket(sync: true); + await socket.open(); + + protocolFactory = new FakeProtocolFactory(); + protocolFactory.message = new TMessage('foo', TMessageType.CALL, 123); + transport = new TAsyncClientSocketTransport( + socket, new TMessageReader(protocolFactory), + responseTimeout: core.Duration.zero); + await transport.open(); + transport.writeAll(requestBytes); + }); + + test('Test response correlates to correct request', () async { + String bufferText; + + Future responseReady = transport.flush().then((_) { + var buffer = new Uint8List(responseBytes.length); + transport.readAll(buffer, 0, responseBytes.length); + bufferText = utf8Codec.decode(buffer); + }); + + // simulate a response + protocolFactory.message = new TMessage('foo', TMessageType.REPLY, 123); + socket.receiveFakeMessage(responseBase64); + + // simulate a second response + var response2Text = 'response 2'; + var response2Bytes = + new Uint8List.fromList(utf8Codec.encode(response2Text)); + var response2Base64 = base64.encode(response2Bytes); + protocolFactory.message = new TMessage('foo2', TMessageType.REPLY, 124); + socket.receiveFakeMessage(response2Base64); + + await responseReady; + expect(bufferText, responseText); + }); + + test('Test response timeout', () async { + Future responseReady = transport.flush(); + expect(responseReady, throwsA(new isInstanceOf())); + }); + }, timeout: new Timeout(new Duration(seconds: 1))); + + group('TAsyncClientSocketTransport with TFramedTransport', () { + FakeSocket socket; + FakeProtocolFactory protocolFactory; + TTransport transport; + + setUp(() async { + socket = new FakeSocket(sync: true); + await socket.open(); + + protocolFactory = new FakeProtocolFactory(); + protocolFactory.message = new TMessage('foo', TMessageType.CALL, 123); + var messageReader = new TMessageReader(protocolFactory, + byteOffset: TFramedTransport.headerByteCount); + + transport = new TFramedTransport(new TAsyncClientSocketTransport( + socket, messageReader, + responseTimeout: core.Duration.zero)); + await transport.open(); + transport.writeAll(requestBytes); + }); + + test('Test async client sending data over framed transport', () async { + String bufferText; + + Future responseReady = transport.flush().then((_) { + var buffer = new Uint8List(responseBytes.length); + transport.readAll(buffer, 0, responseBytes.length); + bufferText = utf8Codec.decode(buffer); + }); + + // simulate a response + protocolFactory.message = new TMessage('foo', TMessageType.REPLY, 123); + socket.receiveFakeMessage(framedResponseBase64); + + await responseReady; + expect(bufferText, responseText); + }); + }, timeout: new Timeout(new Duration(seconds: 1))); + + group('TServerTransport', () { + test('Test server transport listens to socket', () async { + var socket = new FakeSocket(); + await socket.open(); + expect(socket.isOpen, isTrue); + + var transport = new TServerSocketTransport(socket); + expect(transport.hasReadData, isFalse); + + socket.receiveFakeMessage(requestBase64); + + // allow microtask events to finish + await new Future.value(); + + expect(transport.hasReadData, isTrue); + + var buffer = new Uint8List(requestBytes.length); + transport.readAll(buffer, 0, requestBytes.length); + + var bufferText = utf8Codec.decode(buffer); + expect(bufferText, requestText); + }); + + test('Test server sending data over transport', () async { + var socket = new FakeSocket(); + await socket.open(); + + var transport = new TServerSocketTransport(socket); + + transport.writeAll(responseBytes); + expect(socket.sendPayload, isNull); + + transport.flush(); + + // allow microtask events to finish + await new Future.value(); + + expect(socket.sendPayload, isNotNull); + expect(socket.sendPayload, responseBytes); + }); + }, timeout: new Timeout(new Duration(seconds: 1))); +} + +class FakeSocket extends TSocket { + final StreamController _onStateController; + Stream get onState => _onStateController.stream; + + final StreamController _onErrorController; + Stream get onError => _onErrorController.stream; + + final StreamController _onMessageController; + Stream get onMessage => _onMessageController.stream; + + FakeSocket({bool sync: false}) + : _onStateController = new StreamController.broadcast(sync: sync), + _onErrorController = new StreamController.broadcast(sync: sync), + _onMessageController = new StreamController.broadcast(sync: sync); + + bool _isOpen; + + bool get isOpen => _isOpen; + + bool get isClosed => !isOpen; + + Future open() async { + _isOpen = true; + _onStateController.add(TSocketState.OPEN); + } + + Future close() async { + _isOpen = false; + _onStateController.add(TSocketState.CLOSED); + } + + Uint8List _sendPayload; + Uint8List get sendPayload => _sendPayload; + + void send(Uint8List data) { + if (!isOpen) throw new StateError('The socket is not open'); + + _sendPayload = data; + } + + void receiveFakeMessage(String base64text) { + if (!isOpen) throw new StateError('The socket is not open'); + + var message = new Uint8List.fromList(base64.decode(base64text)); + _onMessageController.add(message); + } +} + +class FakeProtocolFactory implements TProtocolFactory { + FakeProtocolFactory(); + + TMessage message; + + getProtocol(TTransport transport) => new FakeProtocol(message); +} + +class FakeProtocol extends Mock implements TProtocol { + FakeProtocol(this._message); + + TMessage _message; + + readMessageBegin() => _message; +} + +Uint8List _getFramedResponse(Uint8List responseBytes) { + var byteOffset = TFramedTransport.headerByteCount; + var response = new Uint8List(byteOffset + responseBytes.length); + + response.buffer.asByteData().setInt32(0, responseBytes.length); + response.setAll(byteOffset, responseBytes); + + return response; +} diff --git a/src/jaegertracing/thrift/lib/dart/test/transport/t_transport_test.dart b/src/jaegertracing/thrift/lib/dart/test/transport/t_transport_test.dart new file mode 100644 index 000000000..0bb381ac8 --- /dev/null +++ b/src/jaegertracing/thrift/lib/dart/test/transport/t_transport_test.dart @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +library thrift.test.transport.t_socket_transport_test; + +import 'package:test/test.dart'; +import 'package:thrift/thrift.dart'; + +/// Common transport tests +void main() { + group('TTransportFactory', () { + test('transport is returned from base factory', () async { + TTransport result; + TTransport transport = null; + + var factory = new TTransportFactory(); + + result = await factory.getTransport(transport); + expect(result, isNull); + + transport = new TBufferedTransport(); + result = await factory.getTransport(transport); + + expect(result, transport); + }); + }); +} -- cgit v1.2.3