import com.dderp.business.dao.PlatformDao import com.dderp.common.api.NoSqlKeysService import com.dderp.common.datas.ESKeys import com.dderp.common.datas.RedisKeys import com.dderp.common.entity.platform.PlatformInfo import com.dderp.common.entity.platform.PlatformRequire import com.dySweetFishPlugin.elasticsearch.ESClient import com.dySweetFishPlugin.redis.RedisService import com.sweetfish.convert.json.JsonConvert import com.sweetfish.source.PageFlipper import com.sweetfish.util.Utility import org.apache.logging.log4j.Logger import org.elasticsearch.action.bulk.BulkRequestBuilder import org.elasticsearch.action.bulk.BulkResponse import org.elasticsearch.action.index.IndexRequestBuilder import org.elasticsearch.common.xcontent.XContentType import org.rex.RMap static def createRunnable(ESClient esClient, RedisService redisService, NoSqlKeysService keysService, PlatformDao platformDao, Logger logger, JsonConvert jsonConvert, String dataSourceId, long supplierCode) { return { esClient.createIndex(keysService.getESKey(ESKeys.ES_DELIVER_PLATFORM_INFO_INDEX, supplierCode), ESKeys.INDEX_CONFIG, Utility.ofMap(ESKeys.ES_DELIVER_PLATFORM_INFO_TYPE, "platforminfo.json")) redisService.del(keysService.getRedisKey(RedisKeys.KEY_DELIVER_PLATFORM_INFO, supplierCode)) esClient.createIndex(keysService.getESKey(ESKeys.ES_DELIVER_PLATFORM_REQUIRE_INDEX, supplierCode), ESKeys.INDEX_CONFIG, Utility.ofMap(ESKeys.ES_DELIVER_PLATFORM_REQUIRE_TYPE, "platformrequire.json")) redisService.del(keysService.getRedisKey(RedisKeys.KEY_DELIVER_PLATFORM_REQUIRE, supplierCode)) RMap params = new RMap() params.put("voidFlag", -1) Ignore_ExecPageFlipper.execute(1000, { PageFlipper p -> List platformInfoList = platformDao.queryPlatformInfoList(params, p, dataSourceId, supplierCode) if (!platformInfoList.isEmpty()) { Map redisMap = platformInfoList.collectEntries { [String.valueOf(it.getId()), jsonConvert.convertTo(it)] } if (!redisMap.isEmpty()) { redisService.hmset(keysService.getRedisKey(RedisKeys.KEY_DELIVER_PLATFORM_INFO, supplierCode), redisMap) } BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk() platformInfoList.each { x -> IndexRequestBuilder indexRequest = esClient.getClient() .prepareIndex(keysService.getESKey(ESKeys.ES_DELIVER_PLATFORM_INFO_INDEX, supplierCode), ESKeys.ES_DELIVER_PLATFORM_INFO_TYPE) .setId(String.valueOf(x.getId())) .setSource(jsonConvert.convertTo(x), XContentType.JSON) bulkRequest.add(indexRequest) } BulkResponse bulkResponse = bulkRequest.execute().actionGet() if (bulkResponse.hasFailures()) { logger.error("索引平台档案出错:" + bulkResponse.buildFailureMessage()) } else { logger.info("索引平台档案,共" + platformInfoList.size() + "条记录") } } }) Ignore_ExecPageFlipper.execute(1000, { PageFlipper p -> List platformRequireList = platformDao.queryPlatformRequireList(params, p, dataSourceId, supplierCode) if (!platformRequireList.isEmpty()) { Map redisMap = platformRequireList.collectEntries { [String.valueOf(it.getId()), jsonConvert.convertTo(it)] } if (!redisMap.isEmpty()) { redisService.hmset(keysService.getRedisKey(RedisKeys.KEY_DELIVER_PLATFORM_REQUIRE, supplierCode), redisMap) } BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk() platformRequireList.each { x -> IndexRequestBuilder indexRequest = esClient.getClient() .prepareIndex(keysService.getESKey(ESKeys.ES_DELIVER_PLATFORM_REQUIRE_INDEX, supplierCode), ESKeys.ES_DELIVER_PLATFORM_REQUIRE_TYPE) .setId(String.valueOf(x.getId())) .setSource(jsonConvert.convertTo(x), XContentType.JSON) bulkRequest.add(indexRequest) } BulkResponse bulkResponse = bulkRequest.execute().actionGet() if (bulkResponse.hasFailures()) { logger.error("索引平台需求条件档案出错:" + bulkResponse.buildFailureMessage()) } else { logger.info("索引平台需求条件档案,共" + platformRequireList.size() + "条记录") } } }) } as Runnable } createRunnable(esClient, redisService, keysService, platformDao, logger, jsonConvert, dataSourceId, supplierCode)