Dépôt officiel du code source de l'ERP OpenConcerto
Rev 174 | Blame | Compare with Previous | Last modification | View Log | RSS feed
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright 2011-2019 OpenConcerto, by ILM Informatique. All rights reserved.
*
* The contents of this file are subject to the terms of the GNU General Public License Version 3
* only ("GPL"). You may not use this file except in compliance with the License. You can obtain a
* copy of the License at http://www.gnu.org/licenses/gpl-3.0.html See the License for the specific
* language governing permissions and limitations under the License.
*
* When distributing the software, include this License Header Notice in each file.
*/
package org.openconcerto.sql.model;
import org.openconcerto.sql.Log;
import org.openconcerto.sql.State;
import org.openconcerto.sql.request.SQLCache;
import org.openconcerto.sql.utils.SQLUtils;
import org.openconcerto.utils.CompareUtils;
import org.openconcerto.utils.ExceptionHandler;
import org.openconcerto.utils.ExceptionUtils;
import org.openconcerto.utils.RTInterruptedException;
import org.openconcerto.utils.ThreadFactory;
import org.openconcerto.utils.Tuple2;
import org.openconcerto.utils.cache.CacheResult;
import org.openconcerto.utils.cache.ICacheSupport;
import org.openconcerto.utils.cc.ITransformerExn;
import java.beans.PropertyChangeEvent;
import java.beans.PropertyChangeListener;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLNonTransientException;
import java.sql.SQLTransientException;
import java.sql.Savepoint;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import org.apache.commons.dbcp.AbandonedConfig;
import org.apache.commons.dbcp.BasicDataSource;
import org.apache.commons.dbcp.ConnectionFactory;
import org.apache.commons.dbcp.PoolableConnection;
import org.apache.commons.dbcp.PoolableConnectionFactory;
import org.apache.commons.dbcp.PoolingConnection;
import org.apache.commons.dbcp.PoolingDataSource;
import org.apache.commons.dbcp.SQLNestedException;
import org.apache.commons.dbutils.BasicRowProcessor;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.commons.dbutils.RowProcessor;
import org.apache.commons.dbutils.handlers.ArrayHandler;
import org.apache.commons.dbutils.handlers.ArrayListHandler;
import org.apache.commons.dbutils.handlers.MapHandler;
import org.apache.commons.dbutils.handlers.MapListHandler;
import org.apache.commons.dbutils.handlers.ScalarHandler;
import org.apache.commons.pool.KeyedObjectPool;
import org.apache.commons.pool.KeyedObjectPoolFactory;
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.h2.constant.ErrorCode;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;
/**
* Une source de donnée SQL.
*
* @author ILM Informatique 10 juin 2004
*/
@ThreadSafe
public final class SQLDataSource extends BasicDataSource implements Cloneable {
// MAYBE add a cache, but ATTN synchronized : one connection per thread, but only one shared DS
/** A map of supported database systems and associated drivers. */
static public final Map<SQLSystem, String> DRIVERS;
static {
DRIVERS = new HashMap<SQLSystem, String>();
DRIVERS.put(SQLSystem.MYSQL, "com.mysql.jdbc.Driver");
DRIVERS.put(SQLSystem.POSTGRESQL, "org.postgresql.Driver");
DRIVERS.put(SQLSystem.DERBY, "org.apache.derby.jdbc.ClientDriver");
DRIVERS.put(SQLSystem.H2, "org.h2.Driver");
DRIVERS.put(SQLSystem.MSSQL, "com.microsoft.sqlserver.jdbc.SQLServerDriver");
}
// timeouts in seconds
static public final int loginTimeOut = 15;
static public final int socketTimeOut = 8 * 60;
// in milliseconds
static public int QUERY_TUNING = 0;
static public interface IgnoringRowProcessor extends RowProcessor {
@Override
public Map<String, Object> toMap(ResultSet rs) throws SQLException;
/**
* Convert the passed result set to a map, ignoring some columns.
*
* @param rs the result set.
* @param toIgnore which columns' label to ignore.
* @return a map with all columns of <code>rs</code> except <code>toIgnore</code>.
* @throws SQLException if an error occurs while reading <code>rs</code>.
*/
public Map<String, Object> toMap(ResultSet rs, Set<String> toIgnore) throws SQLException;
}
// ignoring case-sensitive processor
static private class IgnoringCSRowProcessor extends BasicRowProcessor implements IgnoringRowProcessor {
@Override
public Map<String, Object> toMap(ResultSet rs) throws SQLException {
return toMap(rs, Collections.<String> emptySet());
}
// on ne veut pas de CaseInsensitiveMap
@Override
public Map<String, Object> toMap(ResultSet rs, Set<String> toIgnore) throws SQLException {
final Map<String, Object> result = new HashMap<String, Object>();
final ResultSetMetaData rsmd = rs.getMetaData();
final int cols = rsmd.getColumnCount();
for (int i = 1; i <= cols; i++) {
// ATTN use label not base name (eg "des" in "SELECT DESIGNATION as des FROM ...")
final String label = rsmd.getColumnLabel(i);
if (!toIgnore.contains(label))
result.put(label, rs.getObject(i));
}
return result;
}
}
static public final IgnoringRowProcessor ROW_PROC = new IgnoringCSRowProcessor();
// all thread safe
public static final ColumnListHandler COLUMN_LIST_HANDLER = new ColumnListHandler();
public static final ArrayListHandler ARRAY_LIST_HANDLER = new ArrayListHandler();
public static final ListListHandlerGeneric<Object> LIST_LIST_HANDLER = ListListHandlerGeneric.create(Object.class, null);
public static final ArrayHandler ARRAY_HANDLER = new ArrayHandler();
public static final ScalarHandler SCALAR_HANDLER = new ScalarHandler();
public static final MapListHandler MAP_LIST_HANDLER = new MapListHandler(ROW_PROC);
public static final MapHandler MAP_HANDLER = new MapHandler(ROW_PROC);
// Cache, linked to cacheEnable and tables
@GuardedBy("this")
private SQLCache<List<?>, Object> cache;
@GuardedBy("this")
private boolean cacheEnabled;
private final PropertyChangeListener descL;
// tables that can be used in queries (and thus can impact the cache)
@GuardedBy("this")
private Set<SQLTable> tables;
private static int count = 0; // compteur de requetes
private final DBSystemRoot sysRoot;
private ConnectionFactory connectionFactory;
// no need to synchronize multiple call to this attribute since we only access the
// Thread.currentThread() key
@GuardedBy("handlers")
private final Map<Thread, HandlersStack> handlers;
@GuardedBy("this")
private ExecutorService exec = null;
private final Object setInitialShemaLock = new String("initialShemaWriteLock");
// linked to initialSchema and uptodate
@GuardedBy("this")
private boolean initialShemaSet;
@GuardedBy("this")
private String initialShema;
// which Connection have the right default schema
@GuardedBy("this")
private final Map<Connection, Object> schemaUptodate;
// which Connection aren't invalidated
@GuardedBy("this")
private final Map<Connection, Object> uptodate;
private volatile int retryWait;
@GuardedBy("this")
private boolean blockWhenExhausted;
@GuardedBy("this")
private long softMinEvictableIdleTimeMillis;
@GuardedBy("this")
private int txIsolation;
@GuardedBy("this")
private Integer dbTxIsolation;
@GuardedBy("this")
private boolean checkOnceDBTxIsolation;
private final Object testLock = new String("testLock");
public SQLDataSource(DBSystemRoot sysRoot, String base, String login, String pass) {
this(sysRoot, sysRoot.getServer().getURL(base), login, pass, Collections.<SQLTable> emptySet());
}
private SQLDataSource(DBSystemRoot sysRoot, String url, String login, String pass, Set<SQLTable> tables) {
this(sysRoot);
final SQLSystem system = getSystem();
if (!DRIVERS.containsKey(system))
throw new IllegalArgumentException("unknown database system: " + system);
this.setDriverClassName(DRIVERS.get(system));
this.setUrl("jdbc:" + system.getJDBCName() + ":" + url);
this.setUsername(login);
this.setPassword(pass);
this.setTables(tables);
if (system == SQLSystem.MYSQL) {
this.addConnectionProperty("transformedBitIsBoolean", "true");
// by default allowMultiQueries, since it's more convenient (i.e. pass String around
// instead of List<String>) and faster (less trips to the server, allow
// SQLUtils.executeMultiple())
this.addConnectionProperty("allowMultiQueries", "true");
} else if (system == SQLSystem.MSSQL) {
// Otherwise we get SQLState S0002 instead of 42S02 (needed at least in
// SQLBase.getFwkMetadata())
// http://msdn.microsoft.com/fr-fr/library/ms712451.aspx
// http://technet.microsoft.com/en-us/library/ms378988(v=sql.105).aspx
this.addConnectionProperty("xopenStates", "true");
// see
// https://connect.microsoft.com/SQLServer/feedback/details/295907/resultsetmetadata-gettablename-returns-null-or-inconsistent-results
// http://social.msdn.microsoft.com/Forums/sqlserver/en-US/55e8cbb2-b11c-446e-93ab-dc30658caf99/resultsetmetadatagettablename-returns-instead-of-table-name?forum=sqldataaccess
// 1. The statement that the resultset belongs to was created with
// TYPE_SCROLL_INSENSITIVE or TYPE_SCROLL_SENSITIVE : The full table or view name will
// be returned
// 2. The statement that the resultset belongs to was created without specifying the
// cursor type, or the cursor type is TYPE_FORWARD_ONLY : The full table or view name
// will be returned if the column is a text, ntext, or image, else empty string.
this.addConnectionProperty("selectMethod", "cursor");
}
this.setLoginTimeout(loginTimeOut);
this.setSocketTimeout(socketTimeOut);
this.setTCPKeepAlive(true);
this.setRetryWait(7000);
// ATTN DO NOT call execute() or any method that might create a connection
// since at this point dsInit() has not been called and thus connection properties might be
// missing (eg allowMultiQueries). And the faulty connection will stay in the pool.
}
@Override
public final void setLoginTimeout(int timeout) {
if (this.getSystem() == SQLSystem.MYSQL) {
this.addConnectionProperty("connectTimeout", timeout + "000");
} else if (this.getSystem() == SQLSystem.POSTGRESQL || this.getSystem() == SQLSystem.MSSQL) {
this.addConnectionProperty("loginTimeout", timeout + "");
} else {
Log.get().warning("Ignoring login timeout for " + this);
}
}
public final void setSocketTimeout(int timeout) {
if (this.getSystem() == SQLSystem.MYSQL) {
this.addConnectionProperty("socketTimeout", timeout + "000");
} else if (this.getSystem() == SQLSystem.H2) {
// org.h2.util.NetUtils.createSocket() doesn't use setSoTimeout(), so this is the next
// best thing. But it isn't checked everywhere, see DbSettings.maxQueryTimeout
this.addConnectionProperty("MAX_QUERY_TIMEOUT", timeout + "000");
} else if (this.getSystem() == SQLSystem.POSTGRESQL) {
this.addConnectionProperty("socketTimeout", timeout + "");
} else {
Log.get().log(getLogLevelForIgnoredTCPParam(), "Ignoring socket timeout for " + this);
}
}
// if TCP isn't used or is used to connect to localhost, TCP should be quite robust
private final Level getLogLevelForIgnoredTCPParam() {
return this.sysRoot.getServer().isLocalhost() ? Level.CONFIG : Level.WARNING;
}
public final void setTCPKeepAlive(final boolean b) {
if (this.getSystem() == SQLSystem.POSTGRESQL || this.getSystem() == SQLSystem.MYSQL) {
this.addConnectionProperty("tcpKeepAlive", String.valueOf(b));
} else {
Log.get().log(getLogLevelForIgnoredTCPParam(), "Ignoring TCP keep alive for " + this);
}
}
/**
* Set the number of milliseconds to wait before retrying if a connection fails to establish or
* if a query fails to execute.
*
* @param retryWait the number of milliseconds to wait, negative to disable retrying.
*/
public final void setRetryWait(int retryWait) {
this.retryWait = retryWait;
}
synchronized void setTables(Set<SQLTable> tables) {
// don't change the cache if we're only adding tables
final boolean update = this.cache == null || !tables.containsAll(this.tables);
this.tables = Collections.unmodifiableSet(new HashSet<SQLTable>(tables));
if (update)
updateCache();
}
private synchronized Set<SQLTable> getTables() {
return this.tables;
}
private synchronized void updateCache() {
if (this.cache != null)
this.cache.getSupp().die();
this.cache = createCache(null);
for (final HandlersStack s : this.handlers.values()) {
s.updateCache();
}
}
// the cache for committed data
synchronized final SQLCache<List<?>, Object> getCommittedCache() {
return this.cache;
}
final SQLCache<List<?>, Object> getCache() {
// transactions are isolated from one another, so their caches should be too
final HandlersStack stack = getHandlersStack();
if (stack != null && stack.hasTransaction())
return stack.getCache();
else
return this.getCommittedCache();
}
synchronized SQLCache<List<?>, Object> createCache(final TransactionPoint o) {
final SQLCache<List<?>, Object> res;
if (this.isCacheEnabled() && this.tables.size() > 0) {
// the general cache should wait for transactions to end, but the cache of transactions
// must not.
final boolean committedCache = o == null;
final Object desc = committedCache ? this : o;
final ICacheSupport<SQLData> cacheSupp = committedCache ? null : this.cache.getSupp();
res = new SQLCache<List<?>, Object>(cacheSupp, 30, 30, "results of " + desc.toString(), o) {
@Override
protected String getCacheSuppName(String cacheName) {
assert committedCache : "Creating extra ICacheSupport";
return SQLDataSource.this.toString();
}
};
} else {
res = null;
}
return res;
}
/**
* Enable or disable the cache. ATTN if you enable the cache you must
* {@link SQLTable#fire(SQLTableEvent) fire} table events, or use a class that does like
* {@link SQLRowValues}.
*
* @param b <code>true</code> to enable the cache.
*/
public final synchronized void setCacheEnabled(boolean b) {
if (this.cacheEnabled != b) {
this.cacheEnabled = b;
updateCache();
}
}
public final synchronized boolean isCacheEnabled() {
return this.cacheEnabled;
}
/* pour le clonage */
private SQLDataSource(DBSystemRoot sysRoot) {
this.sysRoot = sysRoot;
// on a besoin d'une implementation synchronisée
this.handlers = new Hashtable<Thread, HandlersStack>();
// weak, since this is only a hint to avoid initializing the connection
// on each borrowal
this.schemaUptodate = new WeakHashMap<Connection, Object>();
this.uptodate = new WeakHashMap<Connection, Object>();
this.initialShemaSet = false;
this.initialShema = null;
// used by #getNewConnection() when there's an exception and by #validateDBConnectivity()
this.setValidationQuery("SELECT 1");
// We must set a socket timeout high enough for large queries but the validation query
// should return almost instantly. Not too low since overloaded links may have very high
// latencies.
this.setValidationQueryTimeout(6);
// don't test on borrow, this would double the queries.
this.setTestOnBorrow(false);
// don't test on return, the connection was just used and this would double the queries.
this.setTestOnReturn(false);
// for now don't test on idle as our evictor is run quite often to promptly close unneeded
// connections. If it is slowed down, we risk overloading the server or just hit its maximum
// connection count. MAYBE enable when we upgrade to DBCP 2, since it depends on POOL 2
// which has an EvictionPolicy, so for example we could test connections after a smaller
// amount of time than minEvictableIdleTimeMillis.
this.setTestWhileIdle(false);
this.setInitialSize(3);
// neither DOS the server...
this.setBlockWhenExhausted(true);
this.setMaxActive(12);
// ... nor us
this.setMaxWait(5000);
// creating connections is quite costly so make sure we always have a couple free
this.setMinIdle(2);
// but not too much as it can lock out other users (the server has a max connection count)
this.setMaxIdle(10);
// check 5 connections every 4 seconds
this.setTimeBetweenEvictionRunsMillis(4000);
this.setNumTestsPerEvictionRun(5);
// kill extra (above minIdle) connections after 40s
this.setSoftMinEvictableIdleTimeMillis(TimeUnit.SECONDS.toMillis(40));
// kill idle connections after 30 minutes (even if it means re-creating some new ones
// immediately afterwards to ensure minIdle connections). For now it's a poor man's solution
// for lacking testWhileIdle. After that time NAT tables expires, the path to the server
// might be broken...
this.setMinEvictableIdleTimeMillis(TimeUnit.MINUTES.toMillis(30));
// the default of many systems
this.txIsolation = Connection.TRANSACTION_READ_COMMITTED;
// by definition unknown without a connection
this.dbTxIsolation = null;
// it's rare that DB configuration changes, and it's expensive to add a trip to the server
// for each new connection
this.checkOnceDBTxIsolation = true;
// see #createDataSource() for properties not supported by this class
this.tables = Collections.emptySet();
this.descL = new PropertyChangeListener() {
@Override
public void propertyChange(PropertyChangeEvent evt) {
if (evt.getPropertyName().equals("descendants")) {
// the dataSource must always have all tables, to listen to them for its cache
setTables(((DBSystemRoot) evt.getSource()).getDescs(SQLTable.class));
}
}
};
this.sysRoot.addListener(this.descL);
this.cache = null;
this.cacheEnabled = false;
}
/**
* Exécute la requête et retourne le résultat sous forme de liste de map. Si la requete va
* retourner beaucoup de lignes, il est peut-être préférable d'utiliser un ResultSetHandler.
*
* @param query le requête à exécuter.
* @return le résultat de la requête.
* @see MapListHandler
* @see #execute(String, ResultSetHandler)
*/
public List execute(String query) {
return (List) this.execute(query, MAP_LIST_HANDLER);
}
/**
* Exécute la requête et retourne la première colonne uniquement.
*
* @param query le requête à exécuter.
* @return le résultat de la requête.
* @see ColumnListHandler
* @see #execute(String, ResultSetHandler)
*/
public List executeCol(String query) {
return (List) this.execute(query, COLUMN_LIST_HANDLER);
}
/**
* Exécute la requête et retourne le résultat sous forme de liste de tableau. Si la requete va
* retourner beaucoup de lignes, il est peut-être préférable d'utiliser un ResultSetHandler.
*
* @param query le requête à exécuter.
* @return le résultat de la requête.
* @see ArrayListHandler
* @see #execute(String, ResultSetHandler)
*/
public List executeA(String query) {
return (List) this.execute(query, ARRAY_LIST_HANDLER);
}
/**
* Exécute la requête et retourne la première ligne du résultat sous forme de map.
*
* @param query le requête à exécuter.
* @return le résultat de la requête.
* @see MapHandler
* @see #execute(String)
*/
public Map<String, Object> execute1(String query) {
return (Map<String, Object>) this.execute(query, MAP_HANDLER);
}
/**
* Exécute la requête et retourne la première ligne du résultat sous forme de tableau.
*
* @param query le requête à exécuter.
* @return le résultat de la requête.
* @see ArrayHandler
* @see #executeA(String)
*/
public Object[] executeA1(String query) {
return (Object[]) this.execute(query, ARRAY_HANDLER);
}
/**
* Exécute la requête et retourne la valeur de la premiere colonne de la premiere ligne.
*
* @param query le requête à exécuter.
* @return le résultat de la requête.
*/
public Object executeScalar(String query) {
return this.execute(query, SCALAR_HANDLER);
}
/**
* Exécute la requête et passe le résultat au ResultSetHandler.
*
* @param query le requête à exécuter.
* @param rsh le handler à utiliser, ou <code>null</code>.
* @return le résultat du handler, <code>null</code> si rsh est <code>null</code>.
* @see #execute(String)
*/
public Object execute(String query, ResultSetHandler rsh) {
return this.execute(query, rsh, null);
}
/**
* Execute <code>query</code> within <code>c</code>, passing the result set to <code>rsh</code>.
*
* @param query the query to perform.
* @param rsh what to do with the result, can be <code>null</code>.
* @param changeState whether <code>query</code> changes the state of a connection.
* @return the result of <code>rsh</code>, <code>null</code> if rsh or the resultSet is
* <code>null</code>.
* @throws RTInterruptedException if the current thread is interrupted while waiting for the
* cache or for the database.
*/
public final Object execute(final String query, final ResultSetHandler rsh, final boolean changeState) throws RTInterruptedException {
return this.execute(query, rsh, changeState, null);
}
private Object execute(final String query, final ResultSetHandler rsh, final Connection c) throws RTInterruptedException {
// false since the vast majority of request do NOT change the state
return this.execute(query, rsh, false, c);
}
final List<Object> getCacheKey(final String query, final ResultSetHandler rsh) {
return query.startsWith("SELECT") ? Arrays.asList(new Object[] { query, rsh }) : null;
}
/**
* Execute <code>query</code> within <code>c</code>, passing the result set to <code>rsh</code>.
*
* @param query the query to perform.
* @param rsh what to do with the result, can be <code>null</code>.
* @param changeState whether <code>query</code> changes the state of a connection.
* @param passedConn the sql connection to use.
* @return the result of <code>rsh</code>, <code>null</code> if rsh or the resultSet is
* <code>null</code>.
* @throws RTInterruptedException if the current thread is interrupted while waiting for the
* cache or for the database.
*/
private Object execute(final String query, final ResultSetHandler rsh, final boolean changeState, final Connection passedConn) throws RTInterruptedException {
final long timeMs = System.currentTimeMillis();
final long time = System.nanoTime();
// some systems refuse to execute nothing
if (query.length() == 0) {
SQLRequestLog.log(query, "Pas de requête.", timeMs, time);
return null;
}
final IResultSetHandler irsh = rsh instanceof IResultSetHandler ? (IResultSetHandler) rsh : null;
final boolean readCache = irsh == null || irsh.readCache();
final boolean canWriteCache = irsh == null || irsh.canWriteCache();
final SQLCache<List<?>, Object> cache = !readCache && !canWriteCache ? null : this.getCache();
final List<Object> key = cache == null ? null : getCacheKey(query, rsh);
final CacheResult<Object> cacheRes;
if (key != null) {
final Set<? extends SQLData> data = irsh == null || irsh.getCacheModifiers() == null ? this.getTables() : irsh.getCacheModifiers();
cacheRes = cache.check(key, readCache, canWriteCache, data);
if (cacheRes.getState() == CacheResult.State.INTERRUPTED)
throw new RTInterruptedException("interrupted while waiting for the cache");
else if (cacheRes.getState() == CacheResult.State.VALID) {
// cache actif
if (State.DEBUG)
State.INSTANCE.addCacheHit();
SQLRequestLog.log(query, "En cache.", timeMs, time);
return cacheRes.getRes();
}
} else {
cacheRes = null;
}
Object result = null;
QueryInfo info = null;
final long afterCache = System.nanoTime();
final long afterQueryInfo, afterExecute, afterHandle;
int count = 0;
try {
info = new QueryInfo(query, changeState, passedConn);
try {
afterQueryInfo = System.nanoTime();
final Object[] res = this.executeTwice(info);
final Statement stmt = (Statement) res[0];
ResultSet rs = (ResultSet) res[1];
// TODO 1. rename #execute(String) to #executeN(String)
// and make #execute(String) do #execute(String, null)
// 2. let null rs pass to rsh
// otherwise you write ds.execute("req", new ResultSetHandler() {
// public Object handle(ResultSet rs) throws SQLException {
// return "OK";
// }
// });
// and OK won't be returned if "req" returns a null rs.
afterExecute = System.nanoTime();
if (rsh != null && rs != null) {
if (this.getSystem() == SQLSystem.DERBY || this.getSystem() == SQLSystem.POSTGRESQL) {
rs = new SQLResultSet(rs);
}
result = rsh.handle(rs);
count = SQLResultSet.getRowProcessedCount(rs);
}
afterHandle = System.nanoTime();
stmt.close();
// if key was added to the cache
if (key != null) {
synchronized (this) {
putInCache(cache, irsh, cacheRes, result);
}
}
info.releaseConnection();
} catch (SQLException exn) {
// don't usually do a getSchema() as it access the db
throw new IllegalStateException("Impossible d'accéder au résultat de " + query + "\n in " + this, exn);
}
} catch (RuntimeException e) {
// for each #check() there must be a #removeRunning()
// let the cache know we ain't gonna tell it the result
if (cacheRes != null)
cache.removeRunning(cacheRes);
if (info != null)
info.releaseConnection(e);
throw e;
}
SQLRequestLog.log(query, "", info.getConnection(), timeMs, time, afterCache, afterQueryInfo, afterExecute, afterHandle, System.nanoTime(), count);
return result;
}
private synchronized void putInCache(final SQLCache<List<?>, Object> cache, final IResultSetHandler irsh, final CacheResult<Object> cacheRes, Object result) {
if (irsh != null && irsh.writeCache() || irsh == null && IResultSetHandler.shouldCache(result)) {
cache.put(cacheRes, result);
} else {
cache.removeRunning(cacheRes);
}
}
private synchronized final ExecutorService getExec() {
if (this.exec == null) {
// not daemon since we want the connections to be returned
final ThreadFactory factory = new ThreadFactory(SQLDataSource.class.getSimpleName() + " " + this.toString() + " exec n° ", false);
// a rather larger number of threads since all they do is wait severals seconds
this.exec = new ThreadPoolExecutor(0, 32, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory);
}
return this.exec;
}
private final class QueryInfo {
private final String query;
// whether query change the state of our connection
private final boolean changeState;
// can change if private
private Connection c;
// whether we acquired a new connection (and thus can do whatever we want with it)
private final boolean privateConnection;
QueryInfo(String query, boolean changeState, final Connection passedConn) {
super();
this.query = query;
this.changeState = changeState;
// if passedConn is provided use it, else we need to find one
boolean acquiredConnection = false;
final Connection foundConn;
if (passedConn != null)
foundConn = passedConn;
else if (!handlingConnection()) {
foundConn = getNewConnection();
acquiredConnection = true;
} else {
final HandlersStack threadHandlers = getHandlersStack();
if (!changeState || threadHandlers.isChangeAllowed()) {
foundConn = threadHandlers.getConnection();
} else {
throw new IllegalStateException("the passed query change the connection's state and the current thread has a connection which will thus be changed."
+ " A possible solution is to execute it in the setup() of a ConnectionHandler\n" + query);
}
}
this.privateConnection = acquiredConnection;
this.c = foundConn;
}
public final Connection getConnection() {
return this.c;
}
public final String getQuery() {
return this.query;
}
void releaseConnection(RuntimeException e) {
// MySQL reste des fois bloqué dans SocketInputStream.socketRead0()
// (le serveur ayant tué la query)
if (e instanceof InterruptedQuery && getSystem() == SQLSystem.MYSQL) {
final ExecutorThread thread = ((InterruptedQuery) e).getThread();
if (this.privateConnection) {
if (this.changeState)
// no need to try to save the connection, it is no longer valid
this.releaseConnection();
else {
// test if the connection is still valid before returning it to the pool
getExec().execute(new Runnable() {
public void run() {
// on attend un peu
try {
thread.join(1500);
// pour voir si on meurt
if (thread.isAlive()) {
Log.get().warning(getFailedCancelMsg());
closeConnection(getConnection());
} else {
// la connexion est ok, on la remet dans le pool
returnConnection(getConnection());
}
} catch (InterruptedException e) {
// the datasource is closing
Log.get().fine("Interrupted while joining " + getQuery());
closeConnection(getConnection());
}
}
});
}
} else {
// try to save the connection since it is used by others
try {
// clear the interrupt status set by InterruptedQuery
// so that we can wait on thread
Thread.interrupted();
thread.join(500);
} catch (InterruptedException e2) {
System.err.println("ignore, we are already interrupted");
e2.printStackTrace();
}
// remettre le flag pour les méthodes appelantes.
Thread.currentThread().interrupt();
// connection is still stuck
if (thread.isAlive()) {
throw new IllegalStateException(getFailedCancelMsg(), e);
} else
this.releaseConnection();
}
} else
this.releaseConnection();
}
void releaseConnection() {
// have we borrowed a connection, otherwise it is not our responsibility to release it.
if (this.privateConnection) {
if (this.changeState)
// the connection is no longer in a pristine state so close it
closeConnection(this.getConnection());
else
// otherwise we can reuse it
returnConnection(this.getConnection());
}
}
private final String getFailedCancelMsg() {
return "cancel of " + System.identityHashCode(getConnection()) + " failed for " + getQuery();
}
final boolean canObtainNewConnection() {
return this.privateConnection;
}
// an error has occured, try within another connection if possible
final Connection obtainNewConnection() {
if (!this.canObtainNewConnection()) {
return null;
} else {
// ATTN should be sure that our connection was not already closed,
// see #closeConnection()
closeConnection(this.getConnection());
this.c = borrowConnection(true);
return this.getConnection();
}
}
@Override
public String toString() {
return this.getClass().getSimpleName() + " private connection: " + this.privateConnection + " query: " + this.getQuery();
}
}
/**
* Whether the current thread has called {@link #useConnection(ConnectionHandler)}.
*
* @return <code>true</code> if within <code>useConnection()</code> and thus safe to call
* {@link #getConnection()}.
*/
public final boolean handlingConnection() {
return this.handlers.containsKey(Thread.currentThread());
}
private final HandlersStack getHandlersStack() {
return this.handlers.get(Thread.currentThread());
}
public final <T, X extends Exception> T useConnection(final ITransformerExn<? super SQLDataSource, T, X> run) throws SQLException, X {
return this.useConnection(new ConnectionHandlerNoSetup<T, X>() {
@Override
public T handle(SQLDataSource ds) throws X {
return run.transformChecked(ds);
}
});
}
/**
* Use a single connection to execute <code>handler</code>.
*
* @param <T> type of return.
* @param <X> type of exception.
* @param handler what to do with the connection.
* @return what <code>handler</code> returned.
* @throws SQLException if an exception happens in setup() or restore().
* @throws X if handle() throws an exception.
* @see ConnectionHandler
*/
public final <T, X extends Exception> T useConnection(ConnectionHandler<T, X> handler) throws SQLException, X {
return this.useConnection(handler, false);
}
private final <T, X extends Exception> T useConnection(ConnectionHandler<T, X> handler, final boolean force) throws SQLException, X {
final HandlersStack h;
boolean connOutsidePool = false;
if (!this.handlingConnection()) {
Connection conn;
try {
conn = this.getNewConnection();
} catch (NoSuchElementException e) {
if (force) {
conn = this.connectionFactory.createConnection();
connOutsidePool = true;
} else {
throw e;
}
}
h = new HandlersStack(this, conn, handler);
this.handlers.put(h.getThread(), h);
} else if (handler.canRestoreState()) {
h = this.getHandlersStack().push(handler);
} else
throw new IllegalStateException("this thread has already called useConnection() and thus expect its state, but the passed handler cannot restore state: " + handler);
Connection conn = null;
// before or after compute, RuntimeException or SQLException
Exception beforeExn = null, afterExn = null;
// X or SQLException
Exception computeExn = null;
try {
conn = h.getConnection();
h.setChangeAllowed(true);
handler.setup(conn);
h.setChangeAllowed(false);
try {
handler.compute(this);
} catch (Exception e) {
computeExn = e;
}
} catch (Exception e) {
h.setChangeAllowed(false);
beforeExn = e;
}
// in all cases (thanks to the above catch), try to restore the state
// if conn is null setup() was never called
boolean pristineState = conn == null;
// don't bother trying to restore state if the connection has been invalidated (by a
// recursive call)
if (!pristineState && h.hasValidConnection() && handler.canRestoreState()) {
h.setChangeAllowed(true);
try {
handler.restoreState(conn);
pristineState = true;
} catch (Exception e) {
afterExn = e;
}
h.setChangeAllowed(false);
}
// ATTN conn can be null (return/closeConnection() accept it)
if (h.pop()) {
// remove if this thread has no handlers left
this.handlers.remove(Thread.currentThread());
if (connOutsidePool) {
conn.close();
} else if (pristineState) {
this.returnConnection(h.getConnection());
} else {
this.closeConnection(h.invalidConnection());
}
} else {
assert !connOutsidePool;
// connection is still used
if (!pristineState) {
this.closeConnection(h.invalidConnection());
}
// else the top handler will release the connection
}
if (beforeExn != null) {
assert computeExn == null : "Compute shouldn't be attempted if setup fails : " + beforeExn + " " + computeExn;
if (afterExn != null) {
throw new SQLException("could not restore state after failed setup : " + ExceptionUtils.getStackTrace(afterExn), beforeExn);
} else {
throw ExceptionUtils.throwExn(beforeExn, SQLException.class, RuntimeException.class);
}
} else if (afterExn != null) {
if (computeExn != null) {
throw new SQLException("could not restore state after successful setup : " + ExceptionUtils.getStackTrace(afterExn), computeExn);
} else {
throw ExceptionUtils.throwExn(afterExn, SQLException.class, RuntimeException.class);
}
} else {
return handler.get();
}
}
// this method create a Statement, don't forget to close it when you're done
private Object[] executeTwice(QueryInfo queryInfo) throws SQLException {
final String query = queryInfo.getQuery();
Object[] res;
try {
res = executeOnce(query, queryInfo.getConnection());
} catch (SQLException exn) {
if (State.DEBUG)
State.INSTANCE.addFailedRequest(query);
// only retry for transient errors
final boolean retry;
if (exn instanceof SQLTransientException) {
retry = true;
} else if (exn instanceof SQLNonTransientException) {
retry = false;
} else if (getSystem() == SQLSystem.H2) {
// 1. server was killed, maybe it will be restarted
// 2. client network interface was brought down, maybe it will be brought up again
retry = exn.getErrorCode() == ErrorCode.CONNECTION_BROKEN_1;
} else if (getSystem() == SQLSystem.POSTGRESQL) {
// Class 08 — Connection Exception (e.g. SocketException)
// Class 57 — Operator Intervention (e.g. server shutdown)
retry = exn.getSQLState().startsWith("08") || exn.getSQLState().startsWith("57");
} else {
retry = getSystem() == SQLSystem.MYSQL;
}
// maybe this was a network problem, so wait a little
final int retryWait = this.retryWait;
if (!retry || retryWait < 0 || !queryInfo.canObtainNewConnection())
throw exn;
try {
Thread.sleep(retryWait);
} catch (InterruptedException e) {
throw new RTInterruptedException(e.getMessage() + " : " + query, exn);
}
// and try to obtain a new connection
try {
final Connection otherConn = queryInfo.obtainNewConnection();
res = executeOnce(query, otherConn);
} catch (Exception e) {
if (e == exn)
throw exn;
else
throw new SQLException("second exec failed: " + e.getLocalizedMessage(), exn);
}
// only log if the second succeeds (otherwise it's thrown)
Log.get().log(Level.INFO, "executeOnce() failed for " + queryInfo, exn);
}
return res;
}
private static final Tuple2<Long, int[]> NO_QUERIES_RES = Tuple2.create(0l, new int[0]);
/**
* Execute multiple queries in batch.
*
* @param queries what to execute.
* @param atomic <code>true</code> if all queries should be executed in a transaction.
* @return the total update count (< 0 if unknown), followed by the individual update counts.
* @throws SQLException if an error occurs.
* @see Statement#executeBatch()
*/
public final Tuple2<Long, int[]> executeBatch(final List<String> queries, final boolean atomic) throws SQLException {
if (queries.isEmpty())
return NO_QUERIES_RES;
final long timeMs = System.currentTimeMillis();
final long time = System.nanoTime();
final long afterCache = time;
final AtomicLong afterQueryInfo = new AtomicLong();
final AtomicLong afterExecute = new AtomicLong();
final AtomicReference<Connection> conn = new AtomicReference<>();
final ConnectionHandlerNoSetup<int[], SQLException> handler = new ConnectionHandlerNoSetup<int[], SQLException>() {
@Override
public int[] handle(SQLDataSource ds) throws SQLException {
afterQueryInfo.set(System.nanoTime());
conn.set(ds.getConnection());
final int[] res;
try (final Statement stmt = conn.get().createStatement()) {
for (final String s : queries) {
stmt.addBatch(s);
}
if (Thread.currentThread().isInterrupted())
throw new RTInterruptedException("Interrupted before executing : " + queries);
res = stmt.executeBatch();
}
afterExecute.set(System.nanoTime());
return res;
}
};
final int[] res = atomic ? SQLUtils.executeAtomic(this, handler) : this.useConnection(handler);
long totalCount = 0;
int i = 0;
for (final int count : res) {
if (count == Statement.SUCCESS_NO_INFO) {
totalCount = -1;
break;
} else {
if (count < 0)
throw new SQLException("Invalid count (" + count + ") for query " + i + " : " + queries.get(i));
totalCount += count;
}
i++;
}
if (SQLRequestLog.isEnabled()) {
final long afterHandle = System.nanoTime();
SQLRequestLog.log(queries.toString(), "executeBatch", conn.get(), timeMs, time, afterCache, afterQueryInfo.get(), afterExecute.get(), afterHandle, afterHandle, (int) totalCount);
}
return Tuple2.create(totalCount, res);
}
/**
* Try to execute a {@link #getValidationQuery() simple query} on the database server. This
* method even works when the pool is exhausted.
*
* @throws SQLException if the query couldn't be executed.
*/
public final void validateDBConnectivity() throws SQLException {
this.validateDBConnectivity(this.getValidationQueryTimeout());
}
public final void validateDBConnectivity(final int timeout) throws SQLException {
this.useConnection(new ConnectionHandlerNoSetup<Object, SQLException>() {
@Override
public Object handle(SQLDataSource ds) throws SQLException {
final Statement stmt = ds.getConnection().createStatement();
try {
stmt.setQueryTimeout(timeout);
final ResultSet rs = stmt.executeQuery(ds.getValidationQuery());
if (!rs.next())
throw new SQLException("No row returned");
rs.close();
} finally {
stmt.close();
}
return null;
}
}, true);
}
private Object[] executeOnce(String query, Connection c) throws SQLException {
final Statement stmt = c.createStatement();
final ResultSet rs = execute(query, stmt);
return new Object[] { stmt, rs };
}
/**
* Exécute la requête et retourne le résultat. Attention le resultSet peut cesser d'être valide
* a tout moment, de plus cette méthode ne ferme pas le statement qu'elle crée, la méthode
* préférée est execute()
*
* @param query le requête à exécuter.
* @return le résultat de la requête.
* @deprecated replaced by execute().
* @see #execute(String)
*/
public ResultSet executeRaw(String query) {
try {
return execute(query, this.getStatement());
} catch (SQLException e) {
try {
return execute(query, this.getStatement());
} catch (SQLException ex) {
ExceptionHandler.handle("Impossible d'executer la query: " + query, ex);
return null;
}
}
}
/**
* Retourne un nouveau statement. Attention, la fermeture est à la charge de l'appelant.
*
* @return un nouveau statement.
* @throws SQLException if an error occurs.
*/
private Statement getStatement() throws SQLException {
return this.getConnection().createStatement();
}
/**
* Execute la requete avec le statement passé. Attention cette méthode ne peut fermer le
* statement car elle retourne directement le resultSet.
*
* @param query le requête à exécuter.
* @param stmt le statement.
* @return le résultat de la requête, should never be null according to the spec but Derby don't
* care.
* @throws SQLException si erreur lors de l'exécution de la requête.
*/
private ResultSet execute(String query, Statement stmt) throws SQLException, RTInterruptedException {
// System.err.println("\n" + count + "*** " + query + "\n");
if (State.DEBUG)
State.INSTANCE.beginRequest(query);
// test before calling JDBC methods and creating threads
boolean interrupted = false;
if (QUERY_TUNING > 0) {
try {
Thread.sleep(QUERY_TUNING);
} catch (InterruptedException e1) {
interrupted = true;
}
} else {
interrupted = Thread.currentThread().isInterrupted();
}
if (interrupted) {
throw new RTInterruptedException("request interrupted : " + query);
}
final long t1 = System.currentTimeMillis();
ResultSet rs = null;
try {
// MAYBE un truc un peu plus formel
if (query.startsWith("INSERT") || query.startsWith("UPDATE") || query.startsWith("DELETE") || query.startsWith("CREATE") || query.startsWith("ALTER") || query.startsWith("DROP")
|| query.startsWith("SET")) {
// MS SQL doesn't support UPDATE
final boolean returnGenK = query.startsWith("INSERT") && stmt.getConnection().getMetaData().supportsGetGeneratedKeys();
stmt.executeUpdate(query, returnGenK ? Statement.RETURN_GENERATED_KEYS : Statement.NO_GENERATED_KEYS);
rs = returnGenK ? stmt.getGeneratedKeys() : null;
} else {
// TODO en faire qu'un seul par Connection
final ExecutorThread thr = new ExecutorThread(stmt, query);
// on lance l'exécution
thr.start();
// et on attend soit qu'elle finisse soit qu'on soit interrompu
try {
rs = thr.getRs();
} catch (SQLException e) {
if (getSystem() == SQLSystem.MYSQL && e.getErrorCode() == 1317) {
thr.stopQuery();
throw new InterruptedQuery("request interrupted : " + query, e, thr);
} else {
throw e;
}
} catch (InterruptedException e) {
thr.stopQuery();
throw new InterruptedQuery("request interrupted : " + query, e, thr);
}
}
} finally {
if (State.DEBUG)
State.INSTANCE.endRequest(query);
}
long t2 = System.currentTimeMillis();
// obviously very long queries tend to last longer, that's normal so don't warn
if (t2 - t1 > 1000 && query.length() < 1000) {
System.err.println("Warning:" + (t2 - t1) + "ms pour :" + query);
}
count++;
return rs;
}
private final class InterruptedQuery extends RTInterruptedException {
private final ExecutorThread thread;
InterruptedQuery(String message, Throwable cause, ExecutorThread thr) {
super(message, cause);
this.thread = thr;
}
public final ExecutorThread getThread() {
return this.thread;
}
}
private static int executorSerial = 0;
private final class ExecutorThread extends Thread {
private final Statement stmt;
private final String query;
private ResultSet rs;
private Exception exn;
private boolean canceled;
public ExecutorThread(Statement stmt, String query) {
super(executorSerial++ + " ExecutorThread on " + query);
this.stmt = stmt;
this.query = query;
this.canceled = false;
}
public void run() {
synchronized (this) {
if (this.canceled)
return;
}
ResultSet rs = null;
try {
// do not use executeQuery since this.query might contain several statements
this.stmt.execute(this.query);
synchronized (this) {
if (this.canceled)
return;
}
rs = this.stmt.getResultSet();
} catch (Exception e) {
// can only be SQLException or RuntimeException
// eg MySQLStatementCancelledException if stopQuery() was called
this.exn = e;
}
this.rs = rs;
}
public void stopQuery() throws SQLException {
if (!this.stmt.isClosed())
this.stmt.cancel();
synchronized (this) {
this.canceled = true;
}
}
public ResultSet getRs() throws SQLException, InterruptedException {
this.join();
// pas besoin de synchronized puisque seule notre thread ecrit les var
// et qu'elle est maintenant terminée
if (this.exn != null) {
if (this.exn instanceof SQLException)
throw (SQLException) this.exn;
else
throw (RuntimeException) this.exn;
}
return this.rs;
}
}
/**
* All connections obtained with {@link #getConnection()} will be closed immediately, but
* threads in {@link #useConnection(ConnectionHandler)} will get to keep them. After the last
* thread returns from {@link #useConnection(ConnectionHandler)} there won't be any connection
* left open. This instance will be permanently closed, it cannot be reused later.
*
* @throws SQLException if a database error occurs
*/
public synchronized void close() throws SQLException {
this.sysRoot.rmListener(this.descL);
@SuppressWarnings("rawtypes")
final GenericObjectPool pool = this.connectionPool;
super.close();
// super close and unset our pool, but we need to keep it
// to allow used connections to be closed, see #closeConnection(Connection)
this.connectionPool = pool;
// interrupt to force waiting threads to close their connections
if (this.exec != null) {
this.exec.shutdownNow();
this.exec = null;
}
// uptodate was cleared by closeConnection()
// the handlers will clear themselves
// the cache is expected to be cleared (when all connections are closed)
if (this.getBorrowedConnectionCount() == 0)
noConnectionIsOpen();
// ATTN keep tables to be able to reopen
}
private synchronized void noConnectionIsOpen() {
assert this.connectionPool == null || (this.connectionPool.getNumIdle() + this.getBorrowedConnectionCount()) == 0;
if (this.cache != null)
this.cache.getSupp().die();
}
/**
* Retourne la connection à cette source de donnée.
*
* @return la connection à cette source de donnée.
* @throws IllegalStateException if not called from within useConnection().
* @see #useConnection(ConnectionHandler)
* @see #handlingConnection()
*/
public final Connection getConnection() {
final HandlersStack res = this.getHandlersStack();
if (res == null)
throw new IllegalStateException("useConnection() wasn't called");
return res.getConnection();
}
public final TransactionPoint getTransactionPoint() {
final HandlersStack handlersStack = this.getHandlersStack();
if (handlersStack == null)
return null;
return handlersStack.getLastTxPoint();
}
/**
* Retourne une connection à cette source de donnée (generally
* {@link #useConnection(ConnectionHandler)} should be used). If a connection in the pool fails
* to {@link #initConnection(Connection) initialize} or if the pool is empty and a new
* connection fails to get created, this method will try to borrow a connection from the pool a
* second time.
* <p>
* Note : you <b>must</b> return this connection (e.g. use try/finally).
* <p>
*
* @return une connection à cette source de donnée.
* @throws NoSuchElementException after {@link #getMaxWait()} milliseconds if the pool is
* exhausted and {@link #blocksWhenExhausted()}.
* @see #returnConnection(Connection)
* @see #closeConnection(Connection)
*/
protected final Connection getNewConnection() throws NoSuchElementException {
try {
return this.borrowConnection(false);
} catch (RTInterruptedException e) {
throw e;
} catch (Exception e) {
if (e instanceof NoSuchElementException) {
// no need to try to test all others connections, the pool is just exhausted
throw (NoSuchElementException) e;
} else {
return this.borrowConnection(true);
}
}
}
/**
* Borrow a new connection from the pool, optionally purging invalid connections with the
* validation query.
*
* @param test if <code>true</code> then testOnBorrow will be set.
* @return the new connection.
* @throws NoSuchElementException after {@link #getMaxWait()} milliseconds if the pool is
* exhausted and {@link #blocksWhenExhausted()}.
*/
private final Connection borrowConnection(final boolean test) throws NoSuchElementException {
if (test) {
synchronized (this.testLock) {
// invalidate all bad connections
setTestOnBorrow(true);
try {
return this._borrowConnection(test);
} finally {
setTestOnBorrow(false);
}
}
} else {
return this._borrowConnection(test);
}
}
private final Connection _borrowConnection(final boolean test) throws NoSuchElementException {
// when we call borrowConnection() with test, it's because there was an error so this
// call is already a second try, thus getRawConnection() shouldn't try a third time.
final Connection res = this.getRawConnection(!test);
try {
initConnection(res);
return res;
} catch (RuntimeException e) {
this.closeConnection(res);
throw e;
}
}
// initialize the passed connection if needed
protected final void initConnection(final Connection res) {
boolean setSchema = false;
String schemaToSet = null;
synchronized (this) {
if (!this.schemaUptodate.containsKey(res)) {
if (this.initialShemaSet) {
setSchema = true;
schemaToSet = this.initialShema;
}
// safe to put before setSchema() since res cannot be passed to
// release/closeConnection()
this.schemaUptodate.put(res, null);
}
// a connection from the pool is up to date since we close all idle connections in
// invalidateAllConnections() and borrowed ones are closed before they return to the
// pool
this.uptodate.put(res, null);
}
// warmup the connection (executing a bogus simple query, like "SELECT 1") could help but in
// general doesn't since we often do getDS().execute() and thus our warm up thread will run
// after the execute(), making it useless.
if (setSchema)
this.setSchema(schemaToSet, res);
}
private static final String pgInterrupted = GT.tr("Interrupted while attempting to connect.");
private void getRawConnectionThrow(final Exception e1, final Exception e2) throws NoSuchElementException {
if (e1.getCause() instanceof NoSuchElementException)
throw (NoSuchElementException) e1.getCause();
else if (e2 == null)
throw new IllegalStateException("Impossible d'obtenir une connexion sur " + this, e1);
else
throw new IllegalStateException("Impossible d'obtenir une connexion sur " + this + "après 2 essais\nexception 2 :" + e2.getLocalizedMessage(), e1);
}
private Connection getRawConnection(final boolean retry) throws NoSuchElementException {
assert !Thread.holdsLock(
this) : "super.getConnection() might block (see setWhenExhaustedAction()), and since return/closeConnection() need this lock, this method cannot wait while holding the lock";
Connection result = null;
try {
result = super.getConnection();
} catch (Exception e1) {
// try to know if interrupt, TODO cleanup : patch pg Driver.java to fill the cause
if (e1.getCause() instanceof InterruptedException || (e1 instanceof PSQLException && e1.getMessage().equals(pgInterrupted))) {
throw new RTInterruptedException(e1);
}
final int retryWait = retry ? this.retryWait : -1;
if (retryWait < 0 || e1 instanceof SQLNonTransientException)
getRawConnectionThrow(e1, null);
try {
// on attend un petit peu
Thread.sleep(retryWait);
// avant de réessayer
result = super.getConnection();
} catch (InterruptedException e) {
throw new RTInterruptedException("interrupted while waiting for a second try", e);
} catch (Exception e) {
getRawConnectionThrow(e1, e);
}
}
if (State.DEBUG)
State.INSTANCE.connectionCreated();
return result;
}
public final int getBorrowedConnectionCount() {
return this.connectionPool == null ? 0 : this.connectionPool.getNumActive();
}
public synchronized boolean blocksWhenExhausted() {
return this.blockWhenExhausted;
}
public synchronized void setBlockWhenExhausted(boolean block) {
this.blockWhenExhausted = block;
if (this.connectionPool != null) {
this.connectionPool.setWhenExhaustedAction(block ? GenericObjectPool.WHEN_EXHAUSTED_BLOCK : GenericObjectPool.WHEN_EXHAUSTED_GROW);
}
}
public synchronized final long getSoftMinEvictableIdleTimeMillis() {
return this.softMinEvictableIdleTimeMillis;
}
public synchronized final void setSoftMinEvictableIdleTimeMillis(long millis) {
this.softMinEvictableIdleTimeMillis = millis;
if (this.connectionPool != null) {
this.connectionPool.setSoftMinEvictableIdleTimeMillis(millis);
}
}
/**
* Whether the database defaut transaction isolation is check only once for this instance. If
* <code>false</code>, every new connection will have its
* {@link Connection#setTransactionIsolation(int) isolation set}. If <code>true</code> the
* isolation will only be set if the {@link #setInitialTransactionIsolation(int) requested one}
* differs from the DB one. In other words, if you want to optimize DB access, the DB
* configuration must match the datasource configuration.
*
* @param checkOnce <code>true</code> to check only once the DB transaction isolation.
*/
public synchronized final void setTransactionIsolationCheckedOnce(final boolean checkOnce) {
this.checkOnceDBTxIsolation = checkOnce;
this.dbTxIsolation = null;
}
public synchronized final boolean isTransactionIsolationCheckedOnce() {
return this.checkOnceDBTxIsolation;
}
// don't use setDefaultTransactionIsolation() in super since it makes extra requests each time a
// connection is borrowed
public final void setInitialTransactionIsolation(int level) {
if (level != Connection.TRANSACTION_READ_UNCOMMITTED && level != Connection.TRANSACTION_READ_COMMITTED && level != Connection.TRANSACTION_REPEATABLE_READ
&& level != Connection.TRANSACTION_SERIALIZABLE)
throw new IllegalArgumentException("Invalid value :" + level);
synchronized (this) {
if (this.txIsolation != level) {
this.txIsolation = level;
// perhaps do like setInitialSchema() : i.e. call setTransactionIsolation() on
// existing connections
this.invalidateAllConnections(false);
}
}
}
public synchronized final int getInitialTransactionIsolation() {
return this.txIsolation;
}
public synchronized final Integer getDBTransactionIsolation() {
return this.dbTxIsolation;
}
final synchronized void setTransactionIsolation(Connection conn) throws SQLException {
if (this.dbTxIsolation == null) {
this.dbTxIsolation = conn.getTransactionIsolation();
assert this.dbTxIsolation != null;
}
// no need to try to change the level if the DB doesn't support transactions
if (this.dbTxIsolation != Connection.TRANSACTION_NONE && (!this.checkOnceDBTxIsolation || this.dbTxIsolation != this.txIsolation)) {
// if not check once, it's the desired action, so don't log
if (this.checkOnceDBTxIsolation)
Log.get().config("Setting transaction isolation to " + this.txIsolation);
conn.setTransactionIsolation(this.txIsolation);
}
}
// allow to know transaction states
private final class TransactionPoolableConnection extends PoolableConnection {
// perhaps call getAutoCommit() once to have the initial value
@GuardedBy("this")
private boolean autoCommit = true;
private TransactionPoolableConnection(Connection conn, @SuppressWarnings("rawtypes") ObjectPool pool, AbandonedConfig config) {
super(conn, pool, config);
}
private HandlersStack getNonNullHandlersStack() throws SQLException {
final HandlersStack res = getHandlersStack();
if (res == null)
throw new SQLException("Unsafe transaction, call useConnection() or SQLUtils.executeAtomic()");
return res;
}
@Override
public synchronized void setAutoCommit(boolean autoCommit) throws SQLException {
if (this.autoCommit != autoCommit) {
// don't call setAutoCommit() if no stack
final HandlersStack handlersStack = getNonNullHandlersStack();
super.setAutoCommit(autoCommit);
this.autoCommit = autoCommit;
if (this.autoCommit)
// some delegates of the super implementation might have already called our
// commit(), but in this case, the following commit will be a no-op
handlersStack.commit(null);
else
handlersStack.addTxPoint(new TransactionPoint(this));
}
}
@Override
public synchronized void commit() throws SQLException {
super.commit();
assert !this.autoCommit;
final HandlersStack handlersStack = getNonNullHandlersStack();
handlersStack.commit(new TransactionPoint(this));
}
@Override
public synchronized void rollback() throws SQLException {
super.rollback();
assert !this.autoCommit;
final HandlersStack handlersStack = getNonNullHandlersStack();
handlersStack.rollback(new TransactionPoint(this));
}
@Override
public synchronized Savepoint setSavepoint() throws SQLException {
// don't call setSavepoint() if no stack
final HandlersStack handlersStack = getNonNullHandlersStack();
final Savepoint res = super.setSavepoint();
// MySQL always create named save points
handlersStack.addTxPoint(new TransactionPoint(this, res, getSystem() == SQLSystem.MYSQL));
return res;
}
@Override
public synchronized Savepoint setSavepoint(String name) throws SQLException {
// don't call setSavepoint() if no stack
final HandlersStack handlersStack = getNonNullHandlersStack();
final Savepoint res = super.setSavepoint(name);
handlersStack.addTxPoint(new TransactionPoint(this, res, true));
return res;
}
@Override
public synchronized void rollback(Savepoint savepoint) throws SQLException {
super.rollback(savepoint);
getNonNullHandlersStack().rollback(savepoint);
}
@Override
public synchronized void releaseSavepoint(Savepoint savepoint) throws SQLException {
super.releaseSavepoint(savepoint);
getNonNullHandlersStack().releaseSavepoint(savepoint);
}
}
@Override
protected void createPoolableConnectionFactory(ConnectionFactory driverConnectionFactory, @SuppressWarnings("rawtypes") KeyedObjectPoolFactory statementPoolFactory, AbandonedConfig configuration)
throws SQLException {
PoolableConnectionFactory connectionFactory = null;
try {
connectionFactory = new PoolableConnectionFactory(driverConnectionFactory, this.connectionPool, statementPoolFactory, this.validationQuery, this.validationQueryTimeout,
this.connectionInitSqls, this.defaultReadOnly, this.defaultAutoCommit, this.defaultTransactionIsolation, this.defaultCatalog, configuration) {
@Override
public Object makeObject() throws Exception {
Connection conn = this._connFactory.createConnection();
if (conn == null) {
throw new IllegalStateException("Connection factory returned null from createConnection");
}
initializeConnection(conn);
setTransactionIsolation(conn);
if (null != this._stmtPoolFactory) {
@SuppressWarnings("rawtypes")
KeyedObjectPool stmtpool = this._stmtPoolFactory.createPool();
conn = new PoolingConnection(conn, stmtpool);
stmtpool.setFactory((PoolingConnection) conn);
}
return new TransactionPoolableConnection(conn, this._pool, this._config);
}
};
validateConnectionFactory(connectionFactory);
} catch (RuntimeException e) {
throw e;
} catch (SQLException e) {
// only wrap if necessary (calling code can use SQLState)
throw e;
} catch (Exception e) {
throw new SQLException("Cannot create PoolableConnectionFactory", e);
}
}
@Override
protected void createConnectionPool() {
super.createConnectionPool();
// methods not defined in superclass and thus not called in super
this.connectionPool.setLifo(true);
this.setBlockWhenExhausted(this.blockWhenExhausted);
this.connectionPool.setSoftMinEvictableIdleTimeMillis(this.softMinEvictableIdleTimeMillis);
}
@Override
protected ConnectionFactory createConnectionFactory() throws SQLException {
final ConnectionFactory res = super.createConnectionFactory();
this.connectionFactory = res;
return res;
}
@Override
protected void createDataSourceInstance() throws SQLException {
// PoolingDataSource returns a PoolGuardConnectionWrapper that complicates a lot of
// things for nothing, so overload to simply return an object of the pool
this.dataSource = new PoolingDataSource(this.connectionPool) {
// we'll migrate to plain SQLException when our superclass does
@SuppressWarnings("deprecation")
@Override
public Connection getConnection() throws SQLException {
try {
return (Connection) this._pool.borrowObject();
} catch (SQLException e) {
throw e;
} catch (NoSuchElementException e) {
throw new SQLNestedException("Cannot get a connection, pool exhausted", e);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new SQLNestedException("Cannot get a connection, general error", e);
}
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
throw new UnsupportedOperationException();
}
};
}
/**
* To return a connection to the pool.
*
* @param con a connection obtained with getRawConnection(), can be <code>null</code>.
*/
protected void returnConnection(final Connection con) {
if (con != null) {
// if !this.initialShemaSet the out of date cannot be brought up to date
final boolean unrecoverableOutOfDate;
synchronized (this) {
unrecoverableOutOfDate = !this.uptodate.containsKey(con) || !this.initialShemaSet && !this.schemaUptodate.containsKey(con);
}
if (isClosed() || unrecoverableOutOfDate)
// if closed : don't fill the pool (which will have thrown an exception anyway)
// if we shouldn't set the schema, we must close all previous connections
// so that we get new ones from the db with the current setting
this.closeConnection(con);
else {
try {
// our connectionPool use PoolableConnectionFactory which creates
// PoolableConnection whose close() actually does a returnObject()
con.close();
} catch (Exception e) {
/* tant pis */
Log.get().log(Level.FINE, "Could not return " + con, e);
}
if (State.DEBUG)
State.INSTANCE.connectionRemoved();
}
}
}
/**
* To actually close a connection to the db (and remove it from the pool).
*
* @param con a connection obtained with getRawConnection(), can be <code>null</code>.
*/
protected void closeConnection(final Connection con) {
// Neither BasicDataSource nor PoolingDataSource provide a closeConnection()
// so we implement one here
if (con != null) {
synchronized (this) {
this.uptodate.remove(con);
this.schemaUptodate.remove(con);
}
try {
// ATTN this always does _numActive--, so we can't call it multiple times
// with the same object
this.connectionPool.invalidateObject(con);
} catch (Exception e) {
/* tant pis */
Log.get().log(Level.FINE, "Could not close " + con, e);
}
// the last connection is being returned
if (this.isClosed() && this.getBorrowedConnectionCount() == 0) {
noConnectionIsOpen();
}
}
}
/**
* Invalidates all open connections. This immediately closes idle connections. Borrowed ones are
* marked as invalid, so that they are closed on return. In other words, after this method
* returns, no existing connection will be provided.
*/
public final void invalidateAllConnections() {
this.invalidateAllConnections(false);
}
public final void invalidateAllConnections(final boolean preventIdleConnections) {
// usefull since Evictor of GenericObjectPool might call ensureMinIdle()
if (preventIdleConnections) {
this.setMinIdle(0);
this.setMaxIdle(0);
}
synchronized (this) {
// otherwise nothing to invalidate
if (this.connectionPool != null) {
// closes all idle connections
this.connectionPool.clear();
// borrowed connections will be closed on return
this.uptodate.clear();
}
}
}
/**
* From now on, every new connection will have its default schema set to schemaName.
*
* @param schemaName the name of the initial default schema, <code>null</code> to remove any
* default schema.
*/
public void setInitialSchema(String schemaName) {
if (schemaName != null || this.getSystem().isClearingPathSupported()) {
this.setInitialSchema(true, schemaName);
} else if (this.getSystem().isDBPathEmpty()) {
this.unsetInitialSchema();
} else
throw new IllegalArgumentException(this + " cannot have no default schema");
}
/**
* From now on, connections won't have their default schema set by this. Of course the SQL
* server might have set one.
*/
public void unsetInitialSchema() {
this.setInitialSchema(false, null);
}
private final void setInitialSchema(final boolean set, final String schemaName) {
synchronized (this.setInitialShemaLock) {
synchronized (this) {
// even if schemaName no longer exists, and thus the following test would fail, the
// next initConnection() will correctly fail
if (this.initialShemaSet == set && CompareUtils.equals(this.initialShema, schemaName))
return;
}
final Connection newConn;
if (set) {
// test if schemaName is valid
newConn = this.getNewConnection();
try {
this.setSchema(schemaName, newConn);
} catch (RuntimeException e) {
this.closeConnection(newConn);
throw e;
}
// don't return connection right now otherwise it might be deemed unrecoverable
} else {
newConn = null;
}
synchronized (this) {
this.initialShemaSet = set;
this.initialShema = schemaName;
this.schemaUptodate.clear();
if (!set)
// by definition we don't want to modify the connection,
// so empty the pool, that way new connections will be created
// the borrowed ones will be closed when returned
this.connectionPool.clear();
else
this.schemaUptodate.put(newConn, null);
}
this.returnConnection(newConn);
}
}
public synchronized final String getInitialSchema() {
return this.initialShema;
}
/**
* Set the default schema of the current thread's connection. NOTE: pointless if not in
* {@link #useConnection(ConnectionHandler)} since otherwise a connection will be borrowed then
* closed.
*
* @param schemaName the name of the new default schema.
*/
public void setSchema(String schemaName) {
this.setSchema(schemaName, null);
}
private void setSchema(String schemaName, Connection c) {
final String q;
if (this.getSystem() == SQLSystem.MYSQL) {
if (schemaName == null) {
if (this.getSchema(c) != null)
throw new IllegalArgumentException("cannot unset DATABASE in MySQL");
else
// nothing to do
q = null;
} else
q = "USE " + schemaName;
} else if (this.getSystem() == SQLSystem.DERBY) {
q = "SET SCHEMA " + SQLBase.quoteIdentifier(schemaName);
} else if (this.getSystem() == SQLSystem.H2) {
q = "SET SCHEMA " + SQLBase.quoteIdentifier(schemaName);
// TODO use the line below, but for now it is only used after schema()
// plus there's no function to read it back
// q = "set SCHEMA_SEARCH_PATH " + SQLBase.quoteIdentifier(schemaName == null ? "" :
// schemaName);
} else if (this.getSystem() == SQLSystem.POSTGRESQL) {
if (schemaName == null) {
// SET cannot empty the path
q = "select set_config('search_path', '', false)";
} else {
q = "set session search_path to " + SQLBase.quoteIdentifier(schemaName);
}
} else if (this.getSystem() == SQLSystem.MSSQL) {
if (schemaName == null) {
throw new IllegalArgumentException("cannot unset default schema in " + this.getSystem());
} else {
// ATTN MSSQL apparently hang until the connection that created the schema commits
q = "ALTER USER " + SQLBase.quoteIdentifier(getUsername()) + " with default_schema = " + SQLBase.quoteIdentifier(schemaName);
}
} else {
throw new UnsupportedOperationException();
}
if (q != null)
this.execute(q, null, true, c);
}
public final String getSchema() {
return this.getSchema(null);
}
private String getSchema(Connection c) {
final String q;
if (this.getSystem() == SQLSystem.MYSQL)
q = "select DATABASE(); ";
else if (this.getSystem() == SQLSystem.DERBY)
q = "select CURRENT SCHEMA;";
else if (this.getSystem() == SQLSystem.POSTGRESQL) {
q = "select (current_schemas(false))[1];";
} else if (this.getSystem() == SQLSystem.H2) {
q = "select SCHEMA();";
} else if (this.getSystem() == SQLSystem.MSSQL) {
q = "select SCHEMA_NAME();";
} else
throw new UnsupportedOperationException();
return (String) this.execute(q, SCALAR_HANDLER, c);
}
@Override
public String toString() {
return this.getUrl();
}
public final SQLSystem getSystem() {
return this.sysRoot.getServer().getSQLSystem();
}
public Object clone() {
SQLDataSource ds = new SQLDataSource(this.sysRoot);
ds.setUrl(this.getUrl());
ds.setUsername(this.getUsername());
ds.setPassword(this.getPassword());
ds.setDriverClassName(this.getDriverClassName());
return ds;
}
}