chennan 8 лет назад
Родитель
Сommit
64191eeb75

+ 17 - 0
src/main/java/org/es/lu/CommonConfig.java

@@ -0,0 +1,17 @@
+package org.es.lu;
+
+import org.springframework.stereotype.Component;
+
+@Component
+public class CommonConfig {
+    public boolean getClusterReadable(String clusterKey) {
+        if ("BX".equalsIgnoreCase(clusterKey)) {
+            return true;
+        }
+
+        if ("YP".equalsIgnoreCase(clusterKey)) {
+            return false;
+        }
+        return false;
+    }
+}

+ 195 - 0
src/main/java/org/es/lu/ElasticsearchCluster.java

@@ -0,0 +1,195 @@
+package org.es.lu;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.es.mapping.utils.StringUtils;
+import org.es.sql.utils.Constants;
+import org.es.sql.utils.EsPersistLogger;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.InitializingBean;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class ElasticsearchCluster implements InitializingBean, DisposableBean {
+
+    private static final String ELASTIC_SEARCH_DRIVER_PREFIX = "jdbc:elastic:";
+
+    private CommonConfig commonConfig;
+
+    private TransportClient client;
+
+    private ClusterIndicesStateKeeper indicesStateKeeper;
+
+    private String clusterKey;
+
+    public ElasticsearchCluster(String url) {
+        client = internalBuildElasticClient(url);
+    }
+
+    @Override
+    public void destroy() throws Exception {
+        indicesStateKeeper.stop();
+        indicesStateKeeper = null;
+    }
+
+    @Override
+    public void afterPropertiesSet() throws Exception {
+        indicesStateKeeper = new ClusterIndicesStateKeeper(client);
+        indicesStateKeeper.start();
+    }
+
+    public TransportClient client() {
+        return client;
+    }
+
+    public boolean isClusterReadable() {
+        return commonConfig.getClusterReadable(this.clusterKey);
+    }
+
+    public void setCommonConfig(CommonConfig commonConfig) {
+        this.commonConfig = commonConfig;
+    }
+
+    public IndexState indexState(String indexName) {
+        if (indicesStateKeeper.isIndicesStateCacheMissing()) {
+            return IndexState.newIndexState(indexName, IndexState.IndexStatus.NO_NODE_AVAILABLE);
+        }
+        IndexState indexState = indicesStateKeeper.indicesStateCache().get(indexName);
+        if (indexState == null) {
+            indexState = IndexState.newIndexState(indexName, IndexState.IndexStatus.INDEX_MISSING);
+        }
+        return indexState;
+    }
+
+    public boolean isIndexWritable(String indexName) {
+        if (StringUtils.isEmpty(indexName)) {
+            return false;
+        }
+        IndexState indexState = indexState(indexName);
+        return (IndexState.IndexStatus.GREEN == indexState.getIndexStatus()
+                || IndexState.IndexStatus.YELLOW == indexState.getIndexStatus());
+    }
+
+    public boolean isIndicesWritable(String... indices) {
+        if (indices == null || indices.length == 0) {
+            return false;
+        }
+        for (String indexName : indices) {
+            if (!isIndexWritable(indexName)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public String getClusterKey() {
+        return clusterKey;
+    }
+
+    public void setClusterKey(String clusterKey) {
+        this.clusterKey = clusterKey;
+    }
+
+    private TransportClient internalBuildElasticClient(String url) {
+        String ipUrl = url.substring(ELASTIC_SEARCH_DRIVER_PREFIX.length());
+        Settings.Builder settingBuilder = Settings.builder();
+        settingBuilder.put("client.transport.sniff", true);
+
+        String hostListString = ipUrl;
+        int clusterNamePosIdx = ipUrl.lastIndexOf("/");
+        if (clusterNamePosIdx >= 0) {
+            hostListString = hostListString.substring(0, clusterNamePosIdx);
+            settingBuilder.put("cluster.name", ipUrl.substring(clusterNamePosIdx + 1));
+        }
+        else {
+            settingBuilder.put("client.transport.ignore_cluster_name", true);
+        }
+
+        List<InetSocketTransportAddress> addressList = Lists.newLinkedList();
+        String[] connStringList = hostListString.split(Constants.COMMA);
+        for (String connStr : connStringList) {
+            String[] connArr = connStr.split(Constants.COLON);
+            if (connArr.length == 1) {
+                addressList.add(new InetSocketTransportAddress(new InetSocketAddress(connArr[0], 9300)));
+            }
+            else {
+                addressList.add(new InetSocketTransportAddress(new InetSocketAddress(connArr[0], Integer.parseInt(connArr[1]))));
+            }
+        }
+        return new PreBuiltTransportClient(settingBuilder.build())
+                .addTransportAddresses(addressList.toArray(new InetSocketTransportAddress[addressList.size()]));
+    }
+
+    private class ClusterIndicesStateKeeper implements Runnable {
+
+        private TransportClient client;
+
+        private volatile boolean indicesStateCacheMissing = false;
+
+        private volatile ImmutableMap<String, IndexState> indicesStateCache;
+
+        private ScheduledExecutorService executorService;
+
+        public ClusterIndicesStateKeeper(TransportClient client) {
+            this.client = client;
+        }
+
+        public void stop() throws Exception {
+            executorService.shutdownNow();
+            executorService = null;
+        }
+
+        public void start() throws Exception {
+            indicesStateCache = newIndicesStateCacheInstance(client);
+            executorService = Executors.newScheduledThreadPool(2);
+            executorService.schedule(this, 3, TimeUnit.SECONDS);
+        }
+
+        public ImmutableMap<String, IndexState> indicesStateCache() {
+            return indicesStateCache;
+        }
+
+        public boolean isIndicesStateCacheMissing() {
+            return indicesStateCacheMissing;
+        }
+
+        @Override
+        public void run() {
+            indicesStateCache = newIndicesStateCacheInstance(client);
+            if (!executorService.isShutdown()) {
+                executorService.schedule(this, 10, TimeUnit.SECONDS);
+            }
+        }
+
+        private ImmutableMap<String, IndexState> newIndicesStateCacheInstance(TransportClient client) {
+            try {
+                ImmutableMap.Builder<String, IndexState> cacheBuilder = ImmutableMap.<String, IndexState>builder();
+                GetIndexResponse getIndexResponse = client.admin().indices().prepareGetIndex().execute().actionGet();
+
+                ClusterHealthResponse clusterHealthResponse = client.admin().cluster().prepareHealth(getIndexResponse.indices()).execute().actionGet();
+                for (String index : clusterHealthResponse.getIndices().keySet()) {
+                    ClusterHealthStatus indexStatus = clusterHealthResponse.getIndices().get(index).getStatus();
+                    cacheBuilder.put(index, IndexState.newIndexState(index, IndexState.IndexStatus.fromString(indexStatus.name())));
+                }
+                indicesStateCacheMissing = false;
+                return cacheBuilder.build();
+            }
+            catch (Exception ex) {
+                EsPersistLogger.warn(this, "Failed to get index state.", ex);
+                indicesStateCacheMissing = true;
+                return ImmutableMap.<String, IndexState>builder().build();
+            }
+        }
+    }
+}

+ 64 - 0
src/main/java/org/es/lu/ElasticsearchClusterManager.java

@@ -0,0 +1,64 @@
+package org.es.lu;
+
+import com.google.common.collect.Lists;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Component
+public class ElasticsearchClusterManager {
+
+    private AtomicInteger requestCount = new AtomicInteger(0);
+
+    @Autowired
+    @Qualifier("cluster01")
+    private ElasticsearchCluster cluster01;
+
+    @Autowired
+    @Qualifier("cluster02")
+    private ElasticsearchCluster cluster02;
+
+    public ElasticsearchCluster[] getReadableClusters() {
+        List<ElasticsearchCluster> clusterList = Lists.newArrayList();
+        if (cluster01.isClusterReadable()) {
+            clusterList.add(cluster01);
+        }
+
+        if (cluster02.isClusterReadable()) {
+            clusterList.add(cluster02);
+        }
+        return clusterList.toArray(new ElasticsearchCluster[clusterList.size()]);
+    }
+
+    public ElasticsearchCluster getCluster(String clusterKey) {
+        if (cluster01.getClusterKey().equalsIgnoreCase(clusterKey)) {
+            return cluster01;
+        }
+
+        if (cluster02.getClusterKey().equalsIgnoreCase(clusterKey)) {
+            return cluster02;
+        }
+        return null;
+    }
+
+    public ElasticsearchCluster getReadableCluster() {
+        ElasticsearchCluster[] clusters = getReadableClusters();
+        if (clusters.length == 0) {
+            return null;
+        }
+        if (clusters.length == 1) {
+            return clusters[0];
+        }
+
+        int index = requestCount.incrementAndGet();
+        if (index < 0) {
+            index = 0;
+            requestCount.set(index);
+        }
+        return clusters[index % clusters.length];
+    }
+
+}

+ 54 - 0
src/main/java/org/es/lu/IndexState.java

@@ -0,0 +1,54 @@
+package org.es.lu;
+
+import com.google.gson.Gson;
+
+public class IndexState {
+
+    private String indexName;
+
+    private IndexStatus indexStatus;
+
+    private IndexState(String indexName, IndexStatus indexStatus) {
+        this.indexName = indexName;
+        this.indexStatus = indexStatus;
+    }
+
+    public enum IndexStatus {
+        GREEN,
+        YELLOW,
+        RED,
+        INDEX_MISSING,
+        NO_NODE_AVAILABLE;
+
+        public static IndexStatus fromString(String statusName) {
+            for (IndexStatus indexStatus : IndexStatus.values()) {
+                if (indexStatus.name().equalsIgnoreCase(statusName)) {
+                    return indexStatus;
+                }
+            }
+            return null;
+        }
+    }
+
+    public String getIndexName() {
+        return indexName;
+    }
+
+    public IndexStatus getIndexStatus() {
+        return indexStatus;
+    }
+
+    public boolean isWriteable() {
+        return getIndexStatus() == IndexStatus.GREEN
+                || getIndexStatus() == IndexStatus.YELLOW;
+    }
+
+    @Override
+    public String toString() {
+        return new Gson().toJson(this, IndexState.class);
+    }
+
+    public static IndexState newIndexState(String index, IndexStatus indexStatus) {
+        return new IndexState(index, indexStatus);
+    }
+}

+ 53 - 0
src/test/java/org/es/test/lu/ElasticsearchClusterTest.java

@@ -0,0 +1,53 @@
+package org.es.test.lu;
+
+import org.es.lu.ElasticsearchCluster;
+import org.es.lu.ElasticsearchClusterManager;
+import org.es.lu.IndexState;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+import java.util.concurrent.TimeUnit;
+
+public class ElasticsearchClusterTest {
+
+    private ClassPathXmlApplicationContext applicationContext;
+
+    private ElasticsearchClusterManager clusterManager;
+
+    @Before
+    public void initSpringContext() {
+        applicationContext = new ClassPathXmlApplicationContext("application-lu.xml");
+        clusterManager = (ElasticsearchClusterManager) applicationContext.getBean("elasticsearchClusterManager");
+    }
+
+    @Test
+    public void test_initCluster() throws Exception {
+        for (int i = 0; i < 5; i++) {
+            ElasticsearchCluster cluster = clusterManager.getCluster("BX");
+            IndexState indexState = cluster.indexState("index");
+            System.out.println(cluster.getClusterKey() + " ===========>" + indexState);
+
+            cluster = clusterManager.getCluster("YP");
+            indexState = cluster.indexState(".custom-dictionary");
+            System.out.println(cluster.getClusterKey() + " ===========>" + indexState);
+
+            TimeUnit.SECONDS.sleep(2);
+        }
+    }
+
+    @Test
+    public void test_getReadableCluster() throws Exception {
+        for (int i = 0; i < 5; i++) {
+            ElasticsearchCluster cluster = clusterManager.getReadableCluster();
+            System.out.println(cluster.getClusterKey());
+
+            TimeUnit.SECONDS.sleep(1);
+        }
+    }
+
+    @Test
+    public void test_copyMetadata() {
+        ElasticsearchCluster cluster = clusterManager.getCluster("BX");
+    }
+}

+ 23 - 0
src/test/resources/application-lu.xml

@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:context="http://www.springframework.org/schema/context"
+       xmlns="http://www.springframework.org/schema/beans"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans
+     http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
+     http://www.springframework.org/schema/context
+     http://www.springframework.org/schema/context/spring-context-2.5.xsd">
+
+    <context:component-scan base-package="org.es"/>
+
+    <bean id="cluster01" class="org.es.lu.ElasticsearchCluster">
+        <constructor-arg index="0" value="jdbc:elastic:192.168.0.125:9300/lu-search-cluster"/>
+        <property name="clusterKey" value="YP" />
+        <property name="commonConfig" ref="commonConfig"/>
+    </bean>
+
+    <bean id="cluster02" class="org.es.lu.ElasticsearchCluster">
+        <constructor-arg index="0" value="jdbc:elastic:192.168.0.124:9300/lu-search-cluster"/>
+        <property name="clusterKey" value="BX" />
+        <property name="commonConfig" ref="commonConfig"/>
+    </bean>
+</beans>

+ 23 - 0
target/test-classes/application-lu.xml

@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:context="http://www.springframework.org/schema/context"
+       xmlns="http://www.springframework.org/schema/beans"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans
+     http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
+     http://www.springframework.org/schema/context
+     http://www.springframework.org/schema/context/spring-context-2.5.xsd">
+
+    <context:component-scan base-package="org.es"/>
+
+    <bean id="cluster01" class="org.es.lu.ElasticsearchCluster">
+        <constructor-arg index="0" value="jdbc:elastic:192.168.0.125:9300/lu-search-cluster"/>
+        <property name="clusterKey" value="YP" />
+        <property name="commonConfig" ref="commonConfig"/>
+    </bean>
+
+    <bean id="cluster02" class="org.es.lu.ElasticsearchCluster">
+        <constructor-arg index="0" value="jdbc:elastic:192.168.0.124:9300/lu-search-cluster"/>
+        <property name="clusterKey" value="BX" />
+        <property name="commonConfig" ref="commonConfig"/>
+    </bean>
+</beans>