diff options
Diffstat (limited to '')
-rw-r--r-- | src/jaegertracing/thrift/lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java | 399 |
1 files changed, 399 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java b/src/jaegertracing/thrift/lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java new file mode 100644 index 000000000..382d978db --- /dev/null +++ b/src/jaegertracing/thrift/lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java @@ -0,0 +1,399 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.thrift; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + + +/** + * This class uses a single thread to set up non-blocking sockets to a set + * of remote servers (hostname and port pairs), and sends a same request to + * all these servers. It then fetches responses from servers. + * + * Parameters: + * int maxRecvBufBytesPerServer - an upper limit for receive buffer size + * per server (in byte). If a response from a server exceeds this limit, the + * client will not allocate memory or read response data for it. + * + * int fetchTimeoutSeconds - time limit for fetching responses from all + * servers (in second). After the timeout, the fetch job is stopped and + * available responses are returned. + * + * ByteBuffer requestBuf - request message that is sent to all servers. + * + * Output: + * Responses are stored in an array of ByteBuffers. Index of elements in + * this array corresponds to index of servers in the server list. Content in + * a ByteBuffer may be in one of the following forms: + * 1. First 4 bytes form an integer indicating length of following data, + * then followed by the data. + * 2. First 4 bytes form an integer indicating length of following data, + * then followed by nothing - this happens when the response data size + * exceeds maxRecvBufBytesPerServer, and the client will not read any + * response data. + * 3. No data in the ByteBuffer - this happens when the server does not + * return any response within fetchTimeoutSeconds. + * + * In some special cases (no servers are given, fetchTimeoutSeconds less + * than or equal to 0, requestBuf is null), the return is null. + * + * Note: + * It assumes all remote servers are TNonblockingServers and use + * TFramedTransport. + * + */ +public class TNonblockingMultiFetchClient { + + private static final Logger LOGGER = LoggerFactory.getLogger( + TNonblockingMultiFetchClient.class.getName() + ); + + // if the size of the response msg exceeds this limit (in byte), we will + // not read the msg + private int maxRecvBufBytesPerServer; + + // time limit for fetching data from all servers (in second) + private int fetchTimeoutSeconds; + + // store request that will be sent to servers + private ByteBuffer requestBuf; + private ByteBuffer requestBufDuplication; + + // a list of remote servers + private List<InetSocketAddress> servers; + + // store fetch results + private TNonblockingMultiFetchStats stats; + private ByteBuffer[] recvBuf; + + public TNonblockingMultiFetchClient(int maxRecvBufBytesPerServer, + int fetchTimeoutSeconds, ByteBuffer requestBuf, + List<InetSocketAddress> servers) { + this.maxRecvBufBytesPerServer = maxRecvBufBytesPerServer; + this.fetchTimeoutSeconds = fetchTimeoutSeconds; + this.requestBuf = requestBuf; + this.servers = servers; + + stats = new TNonblockingMultiFetchStats(); + recvBuf = null; + } + + public synchronized int getMaxRecvBufBytesPerServer() { + return maxRecvBufBytesPerServer; + } + + public synchronized int getFetchTimeoutSeconds() { + return fetchTimeoutSeconds; + } + + /** + * return a duplication of requestBuf, so that requestBuf will not + * be modified by others. + */ + public synchronized ByteBuffer getRequestBuf() { + if (requestBuf == null) { + return null; + } else { + if (requestBufDuplication == null) { + requestBufDuplication = requestBuf.duplicate(); + } + return requestBufDuplication; + } + } + + public synchronized List<InetSocketAddress> getServerList() { + if (servers == null) { + return null; + } + return Collections.unmodifiableList(servers); + } + + public synchronized TNonblockingMultiFetchStats getFetchStats() { + return stats; + } + + /** + * main entry function for fetching from servers + */ + public synchronized ByteBuffer[] fetch() { + // clear previous results + recvBuf = null; + stats.clear(); + + if (servers == null || servers.size() == 0 || + requestBuf == null || fetchTimeoutSeconds <= 0) { + return recvBuf; + } + + ExecutorService executor = Executors.newSingleThreadExecutor(); + MultiFetch multiFetch = new MultiFetch(); + FutureTask<?> task = new FutureTask(multiFetch, null); + executor.execute(task); + try { + task.get(fetchTimeoutSeconds, TimeUnit.SECONDS); + } catch(InterruptedException ie) { + // attempt to cancel execution of the task. + task.cancel(true); + LOGGER.error("interrupted during fetch: "+ie.toString()); + } catch(ExecutionException ee) { + // attempt to cancel execution of the task. + task.cancel(true); + LOGGER.error("exception during fetch: "+ee.toString()); + } catch(TimeoutException te) { + // attempt to cancel execution of the task. + task.cancel(true); + LOGGER.error("timeout for fetch: "+te.toString()); + } + + executor.shutdownNow(); + multiFetch.close(); + return recvBuf; + } + + /** + * Private class that does real fetch job. + * Users are not allowed to directly use this class, as its run() + * function may run forever. + */ + private class MultiFetch implements Runnable { + private Selector selector; + + /** + * main entry function for fetching. + * + * Server responses are stored in TNonblocingMultiFetchClient.recvBuf, + * and fetch statistics is in TNonblockingMultiFetchClient.stats. + * + * Sanity check for parameters has been done in + * TNonblockingMultiFetchClient before calling this function. + */ + public void run() { + long t1 = System.currentTimeMillis(); + + int numTotalServers = servers.size(); + stats.setNumTotalServers(numTotalServers); + + // buffer for receiving response from servers + recvBuf = new ByteBuffer[numTotalServers]; + // buffer for sending request + ByteBuffer sendBuf[] = new ByteBuffer[numTotalServers]; + long numBytesRead[] = new long[numTotalServers]; + int frameSize[] = new int[numTotalServers]; + boolean hasReadFrameSize[] = new boolean[numTotalServers]; + + try { + selector = Selector.open(); + } catch (IOException e) { + LOGGER.error("selector opens error: "+e.toString()); + return; + } + + for (int i = 0; i < numTotalServers; i++) { + // create buffer to send request to server. + sendBuf[i] = requestBuf.duplicate(); + // create buffer to read response's frame size from server + recvBuf[i] = ByteBuffer.allocate(4); + stats.incTotalRecvBufBytes(4); + + InetSocketAddress server = servers.get(i); + SocketChannel s = null; + SelectionKey key = null; + try { + s = SocketChannel.open(); + s.configureBlocking(false); + // now this method is non-blocking + s.connect(server); + key = s.register(selector, s.validOps()); + // attach index of the key + key.attach(i); + } catch (Exception e) { + stats.incNumConnectErrorServers(); + String err = String.format("set up socket to server %s error: %s", + server.toString(), e.toString()); + LOGGER.error(err); + // free resource + if (s != null) { + try {s.close();} catch (Exception ex) {} + } + if (key != null) { + key.cancel(); + } + } + } + + // wait for events + while (stats.getNumReadCompletedServers() + + stats.getNumConnectErrorServers() < stats.getNumTotalServers()) { + // if the thread is interrupted (e.g., task is cancelled) + if (Thread.currentThread().isInterrupted()) { + return; + } + + try{ + selector.select(); + } catch (Exception e) { + LOGGER.error("selector selects error: "+e.toString()); + continue; + } + + Iterator<SelectionKey> it = selector.selectedKeys().iterator(); + while (it.hasNext()) { + SelectionKey selKey = it.next(); + it.remove(); + + // get previously attached index + int index = (Integer)selKey.attachment(); + + if (selKey.isValid() && selKey.isConnectable()) { + // if this socket throws an exception (e.g., connection refused), + // print error msg and skip it. + try { + SocketChannel sChannel = (SocketChannel)selKey.channel(); + sChannel.finishConnect(); + } catch (Exception e) { + stats.incNumConnectErrorServers(); + String err = String.format("socket %d connects to server %s " + + "error: %s", + index, servers.get(index).toString(), e.toString()); + LOGGER.error(err); + } + } + + if (selKey.isValid() && selKey.isWritable()) { + if (sendBuf[index].hasRemaining()) { + // if this socket throws an exception, print error msg and + // skip it. + try { + SocketChannel sChannel = (SocketChannel)selKey.channel(); + sChannel.write(sendBuf[index]); + } catch (Exception e) { + String err = String.format("socket %d writes to server %s " + + "error: %s", + index, servers.get(index).toString(), e.toString()); + LOGGER.error(err); + } + } + } + + if (selKey.isValid() && selKey.isReadable()) { + // if this socket throws an exception, print error msg and + // skip it. + try { + SocketChannel sChannel = (SocketChannel)selKey.channel(); + int bytesRead = sChannel.read(recvBuf[index]); + + if (bytesRead > 0) { + numBytesRead[index] += bytesRead; + + if (!hasReadFrameSize[index] && + recvBuf[index].remaining()==0) { + // if the frame size has been read completely, then prepare + // to read the actual frame. + frameSize[index] = recvBuf[index].getInt(0); + + if (frameSize[index] <= 0) { + stats.incNumInvalidFrameSize(); + String err = String.format("Read an invalid frame size %d" + + " from %s. Does the server use TFramedTransport? ", + frameSize[index], servers.get(index).toString()); + LOGGER.error(err); + sChannel.close(); + continue; + } + + if (frameSize[index] + 4 > stats.getMaxResponseBytes()) { + stats.setMaxResponseBytes(frameSize[index]+4); + } + + if (frameSize[index] + 4 > maxRecvBufBytesPerServer) { + stats.incNumOverflowedRecvBuf(); + String err = String.format("Read frame size %d from %s," + + " total buffer size would exceed limit %d", + frameSize[index], servers.get(index).toString(), + maxRecvBufBytesPerServer); + LOGGER.error(err); + sChannel.close(); + continue; + } + + // reallocate buffer for actual frame data + recvBuf[index] = ByteBuffer.allocate(frameSize[index] + 4); + recvBuf[index].putInt(frameSize[index]); + + stats.incTotalRecvBufBytes(frameSize[index]); + hasReadFrameSize[index] = true; + } + + if (hasReadFrameSize[index] && + numBytesRead[index] >= frameSize[index]+4) { + // has read all data + sChannel.close(); + stats.incNumReadCompletedServers(); + long t2 = System.currentTimeMillis(); + stats.setReadTime(t2-t1); + } + } + } catch (Exception e) { + String err = String.format("socket %d reads from server %s " + + "error: %s", + index, servers.get(index).toString(), e.toString()); + LOGGER.error(err); + } + } + } + } + } + + /** + * dispose any resource allocated + */ + public void close() { + try { + if (selector.isOpen()) { + Iterator<SelectionKey> it = selector.keys().iterator(); + while (it.hasNext()) { + SelectionKey selKey = it.next(); + SocketChannel sChannel = (SocketChannel)selKey.channel(); + sChannel.close(); + } + + selector.close(); + } + } catch (IOException e) { + LOGGER.error("free resource error: "+e.toString()); + } + } + } +}
\ No newline at end of file |