У нас есть синглтон groovy, который использует PoolingHttpClientConnectionManager (httpclient: 4.3.6) с размером пула 200, чтобы обрабатывать очень высокие параллельные подключения к службе поиска и обрабатывает ответ xml.
Несмотря на то, что он задал таймауты, он зависает примерно раз в месяц, но отлично работает в остальное время.
Ниже приведено синглтон groovy. Метод retrieveInputFromURL, похоже, блокируется на client.execute(get);
@Singleton(strict=false)
class StreamManagerUtil {
// Instantiate once and cache for lifetime of Signleton class
private static PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager();
private static CloseableHttpClient client;
private static final IdleConnectionMonitorThread staleMonitor = new IdleConnectionMonitorThread(connManager);
private int warningLimit;
private int readTimeout;
private int connectionTimeout;
private int connectionFetchTimeout;
private int poolSize;
private int routeSize;
PropertyManager propertyManager = PropertyManagerFactory.getInstance().getPropertyManager("sebe.properties")
StreamManagerUtil() {
// Initialize all instance variables in singleton from properties file
readTimeout = 6
connectionTimeout = 6
connectionFetchTimeout =6
// Pooling
poolSize = 200
routeSize = 50
// Connection pool size and number of routes to cache
connManager.setMaxTotal(poolSize);
connManager.setDefaultMaxPerRoute(routeSize);
// ConnectTimeout : time to establish connection with GSA
// ConnectionRequestTimeout : time to get connection from pool
// SocketTimeout : waiting for packets form GSA
RequestConfig config = RequestConfig.custom()
.setConnectTimeout(connectionTimeout * 1000)
.setConnectionRequestTimeout(connectionFetchTimeout * 1000)
.setSocketTimeout(readTimeout * 1000).build();
// Keep alive for 5 seconds if server does not have keep alive header
ConnectionKeepAliveStrategy myStrategy = new ConnectionKeepAliveStrategy() {
@Override
public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
HeaderElementIterator it = new BasicHeaderElementIterator
(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
while (it.hasNext()) {
HeaderElement he = it.nextElement();
String param = he.getName();
String value = he.getValue();
if (value != null && param.equalsIgnoreCase
("timeout")) {
return Long.parseLong(value) * 1000;
}
}
return 5 * 1000;
}
};
// Close all connection older than 5 seconds. Run as separate thread.
staleMonitor.start();
staleMonitor.join(1000);
client = HttpClients.custom().setDefaultRequestConfig(config).setKeepAliveStrategy(myStrategy).setConnectionManager(connManager).build();
}
private retrieveInputFromURL (String categoryUrl, String xForwFor, boolean isXml) throws Exception {
URL url = new URL( categoryUrl );
GPathResult searchResponse = null
InputStream inputStream = null
HttpResponse response;
HttpGet get;
try {
long startTime = System.nanoTime();
get = new HttpGet(categoryUrl);
response = client.execute(get);
int resCode = response.getStatusLine().getStatusCode();
if (xForwFor != null) {
get.setHeader("X-Forwarded-For", xForwFor)
}
if (resCode == HttpStatus.SC_OK) {
if (isXml) {
extractXmlString(response)
} else {
StringBuffer buffer = buildStringFromResponse(response)
return buffer.toString();
}
}
}
catch (Exception e)
{
throw e;
}
finally {
// Release connection back to pool
if (response != null) {
EntityUtils.consume(response.getEntity());
}
}
}
private extractXmlString(HttpResponse response) {
InputStream inputStream = response.getEntity().getContent()
XmlSlurper slurper = new XmlSlurper()
slurper.setFeature("http://xml.org/sax/features/validation", false)
slurper.setFeature("http://apache.org/xml/features/disallow-doctype-decl", false)
slurper.setFeature("http://apache.org/xml/features/nonvalidating/load-dtd-grammar", false)
slurper.setFeature("http://apache.org/xml/features/nonvalidating/load-external-dtd", false)
return slurper.parse(inputStream)
}
private StringBuffer buildStringFromResponse(HttpResponse response) {
StringBuffer buffer= new StringBuffer();
BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity().getContent()));
String line = "";
while ((line = rd.readLine()) != null) {
buffer.append(line);
System.out.println(line);
}
return buffer
}
public class IdleConnectionMonitorThread extends Thread {
private final HttpClientConnectionManager connMgr;
private volatile boolean shutdown;
public IdleConnectionMonitorThread
(PoolingHttpClientConnectionManager connMgr) {
super();
this.connMgr = connMgr;
}
@Override
public void run() {
try {
while (!shutdown) {
synchronized (this) {
wait(5000);
connMgr.closeExpiredConnections();
connMgr.closeIdleConnections(10, TimeUnit.SECONDS);
}
}
} catch (InterruptedException ex) {
// Ignore
}
}
public void shutdown() {
shutdown = true;
synchronized (this) {
notifyAll();
}
}
}
Я также обнаружил, что нашел это в журнале, что заставило меня поверить, что это произошло, ожидая ответа данных
java.net.SocketTimeoutException: время ожидания проверяется на java.net.SocketInputStream.socketRead0 (собственный метод) на java.net.SocketInputStream.read(SocketInputStream.java:150) на java.net.SocketInputStream.read(SocketInputStream. java: 121) at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
Выводы до сих пор:
- Мы используем java 1.8u25. Существует открытый вопрос по аналогичному сценарию https://bugs.openjdk.java.net/browse/JDK-8075484
- HttpClient имел аналогичный отчет https://issues.apache.org/jira/browse/HTTPCLIENT-1589, но это было исправлено в версия 4.3.6, которую мы используем
Вопросы
- Может ли это быть проблемой синхронизации? По моему пониманию, несмотря на то, что к singleton обращаются к нескольким потокам, единственными доступными данными являются кэшированный CloseableHttpClient
- Есть ли что-то еще принципиально неправильное в этом коде, подход, который может вызвать такое поведение?