| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- package org.es.jdbc.api;
- import org.elasticsearch.client.Client;
- import org.es.jdbc.es.ElasticClientProvider;
- import org.es.sql.utils.PersistLogger;
- import java.lang.reflect.InvocationHandler;
- import java.lang.reflect.InvocationTargetException;
- import java.lang.reflect.Method;
- import java.lang.reflect.Proxy;
- import java.sql.Connection;
- import java.sql.SQLException;
- public class ElasticSingleConnectionDataSource extends DriverManagerDataSource implements SmartDataSource {
- private final Object connectionMonitor = new Object();
- private boolean suppressClose;
- private Connection target;
- private Connection connection;
- private Client client;
- private ElasticClientProvider elasticClientProvider;
- public ElasticSingleConnectionDataSource() {
- }
- public ElasticSingleConnectionDataSource(String url, boolean suppressClose) {
- super(url);
- this.suppressClose = suppressClose;
- }
- public void setElasticClientProvider(ElasticClientProvider elasticClientProvider) {
- this.elasticClientProvider = elasticClientProvider;
- }
- protected boolean isSuppressClose() {
- return this.suppressClose;
- }
- public void setSuppressClose(boolean suppressClose) {
- this.suppressClose = suppressClose;
- }
- @Override
- public Connection getConnection() throws SQLException {
- synchronized (this.connectionMonitor) {
- if (this.connection == null) {
- initConnection();
- }
- if (this.connection.isClosed()) {
- throw new SQLException(
- "Connection was closed in ElasticSingleConnectionDataSource. Check that user code checks " +
- "shouldClose() before closing Connections, or set 'suppressClose' to 'true'");
- }
- return this.connection;
- }
- }
- @Override
- public Connection getConnection(String username, String password) throws SQLException {
- return getConnection();
- }
- public boolean shouldClose(Connection con) {
- synchronized (this.connectionMonitor) {
- return (con != this.connection && con != this.target);
- }
- }
- public void destroy() {
- synchronized (this.connectionMonitor) {
- closeConnection();
- }
- }
- public void initConnection() throws SQLException {
- if (getUrl() == null) {
- throw new IllegalStateException("'url' property is required for lazily initializing a Connection");
- }
- synchronized (this.connectionMonitor) {
- closeConnection();
- try {
- if (elasticClientProvider != null) {
- client = elasticClientProvider.createElasticClientFromUrl(getUrl());
- if (client == null) {
- throw new SQLException(String.format("Failed to build elastic client for url[%s]", getUrl()));
- }
- target = new ElasticConnection(getUrl(), null, client);
- }
- else {
- this.target = getConnectionFromDriver();
- }
- }
- catch (Exception exp) {
- throw new SQLException(String.format("Failed to create connection for url[%s]", getUrl()), exp);
- }
- prepareConnection(target);
- this.connection = (isSuppressClose() ? getCloseSuppressingConnectionProxy(target) : target);
- }
- }
- public void resetConnection() {
- synchronized (this.connectionMonitor) {
- closeConnection();
- this.target = null;
- this.connection = null;
- }
- }
- protected void prepareConnection(Connection con) throws SQLException {
- Boolean autoCommit = getAutoCommitValue();
- if (autoCommit != null && con.getAutoCommit() != autoCommit) {
- con.setAutoCommit(autoCommit);
- }
- }
- private void closeConnection() {
- if (this.target != null) {
- try {
- this.target.close();
- }
- catch (Throwable ex) {
- PersistLogger.warn(this, "Could not close shared JDBC Connection", ex);
- }
- }
- if (client != null) {
- try {
- client.close();
- }
- catch (Exception ex) {
- PersistLogger.error(this, "Could not close elasticsearch client", ex);
- }
- }
- }
- protected Connection getCloseSuppressingConnectionProxy(Connection target) {
- return (Connection) Proxy.newProxyInstance(
- ConnectionProxy.class.getClassLoader(),
- new Class[]{ConnectionProxy.class},
- new CloseSuppressingInvocationHandler(target));
- }
- protected Boolean getAutoCommitValue() {
- return Boolean.FALSE;
- }
- public void setAutoCommit(boolean autoCommit) {
- // ignore
- }
- private static class CloseSuppressingInvocationHandler implements InvocationHandler {
- private final Connection target;
- public CloseSuppressingInvocationHandler(Connection target) {
- this.target = target;
- }
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- // Invocation on ConnectionProxy interface coming in...
- if (method.getName().equals("equals")) {
- // Only consider equal when proxies are identical.
- return (proxy == args[0]);
- }
- else if (method.getName().equals("hashCode")) {
- // Use hashCode of Connection proxy.
- return System.identityHashCode(proxy);
- }
- else if (method.getName().equals("unwrap")) {
- if (((Class) args[0]).isInstance(proxy)) {
- return proxy;
- }
- }
- else if (method.getName().equals("isWrapperFor")) {
- if (((Class) args[0]).isInstance(proxy)) {
- return true;
- }
- }
- else if (method.getName().equals("close")) {
- // Handle close method: don't pass the call on.
- return null;
- }
- else if (method.getName().equals("isClosed")) {
- return false;
- }
- else if (method.getName().equals("getTargetConnection")) {
- // Handle getTargetConnection method: return underlying Connection.
- return this.target;
- }
- // Invoke method on target Connection.
- try {
- return method.invoke(this.target, args);
- }
- catch (InvocationTargetException ex) {
- throw ex.getTargetException();
- }
- }
- }
- }
|