import com.dderp.business.dao.ClientDao import com.dderp.common.api.NoSqlKeysService import com.dderp.common.datas.RedisKeys import com.dderp.common.datas.ESKeys import com.dderp.common.entity.client.ClientIndustryInfo import com.dderp.common.entity.client.ClientInfo import com.dderp.common.entity.client.ClientLevel import com.dderp.common.entity.client.ClientReceiveAddress import com.dySweetFishPlugin.elasticsearch.ESClient import com.dySweetFishPlugin.redis.RedisService import com.sweetfish.convert.json.JsonConvert import com.sweetfish.source.PageFlipper import com.sweetfish.util.Utility import org.apache.logging.log4j.Logger import org.elasticsearch.action.bulk.BulkRequestBuilder import org.elasticsearch.action.bulk.BulkResponse import org.elasticsearch.action.index.IndexRequestBuilder import org.elasticsearch.common.xcontent.XContentType import org.rex.RMap static def createRunnable(ESClient esClient, ClientDao clientDao, boolean resetClientInfo, NoSqlKeysService keysService, JsonConvert jsonConvert, Logger logger, RedisService redisService, String dataSourceId, long supplierCode) { return { RMap params = new RMap() redisService.del(keysService.getRedisKey(RedisKeys.KEY_ERP_CLIENT_INDUSTRY_INFO, supplierCode)); Ignore_ExecPageFlipper.execute(1000, { PageFlipper p -> List industryInfoList = clientDao.queryClientIndustry(params, p, dataSourceId, supplierCode); if (!industryInfoList.isEmpty()) { Map industryMap = industryInfoList.collectEntries { [String.valueOf(it.getId()), jsonConvert.convertTo(it)] } redisService.hmset(keysService.getRedisKey(RedisKeys.KEY_ERP_CLIENT_INDUSTRY_INFO, supplierCode), industryMap); } }); redisService.del(keysService.getRedisKey(RedisKeys.KEY_ERP_CLIENT_LEVEL, supplierCode)); Ignore_ExecPageFlipper.execute(1000, { PageFlipper p -> List levelList = clientDao.queryClientLevel(params, p, dataSourceId, supplierCode); if (!levelList.isEmpty()) { Map levelMap = levelList.collectEntries { [String.valueOf(it.getId()), jsonConvert.convertTo(it)] } redisService.hmset(keysService.getRedisKey(RedisKeys.KEY_ERP_CLIENT_LEVEL, supplierCode), levelMap); } }); if (resetClientInfo) { esClient.createIndex(keysService.getESKey(ESKeys.ES_ERP_CLIENT_INFO_INDEX, supplierCode), ESKeys.INDEX_CONFIG, Utility.ofMap(ESKeys.ES_ERP_CLIENT_INFO_TYPE, "clientinfo.json")); Ignore_ExecPageFlipper.execute(1000, { PageFlipper p -> List clientInfoList = clientDao.queryClientInfo(params, p, dataSourceId, supplierCode); if (!clientInfoList.isEmpty()) { BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk(); clientInfoList.forEach((x) -> { IndexRequestBuilder indexRequest = esClient.getClient() .prepareIndex(keysService.getESKey(ESKeys.ES_ERP_CLIENT_INFO_INDEX, supplierCode), ESKeys.ES_ERP_CLIENT_INFO_TYPE) .setId(String.valueOf(x.getId())) .setSource(jsonConvert.convertTo(x), XContentType.JSON); bulkRequest.add(indexRequest); }); BulkResponse bulkResponse = bulkRequest.execute().actionGet(); if (bulkResponse.hasFailures()) { logger.error("索引客户档案:" + bulkResponse.buildFailureMessage()); } else { logger.info("索引客户档案共" + clientInfoList.size() + "条记录"); } } }); } else { esClient.checkIndexEx(keysService.getESKey(ESKeys.ES_ERP_CLIENT_INFO_INDEX, supplierCode), ESKeys.INDEX_CONFIG, Utility.ofMap(ESKeys.ES_ERP_CLIENT_INFO_TYPE, "clientinfo.json")); } esClient.createIndex(keysService.getESKey(ESKeys.ES_ERP_CLIENT_RECEIVEADDRESS_INDEX, supplierCode), ESKeys.INDEX_CONFIG, Utility.ofMap(ESKeys.ES_ERP_CLIENT_RECEIVEADDRESS_TYPE, "clientreceiveaddress.json")); Ignore_ExecPageFlipper.execute(1000, { PageFlipper p -> List addressList = clientDao.queryClientReceiveAddress(params, p, dataSourceId, supplierCode); if (!addressList.isEmpty()) { BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk(); addressList.forEach((x) -> { IndexRequestBuilder indexRequest = esClient.getClient() .prepareIndex(keysService.getESKey(ESKeys.ES_ERP_CLIENT_RECEIVEADDRESS_INDEX, supplierCode), ESKeys.ES_ERP_CLIENT_RECEIVEADDRESS_TYPE) .setId(String.valueOf(x.getId())) .setSource(jsonConvert.convertTo(x), XContentType.JSON); bulkRequest.add(indexRequest); }); BulkResponse bulkResponse = bulkRequest.execute().actionGet(); if (bulkResponse.hasFailures()) { logger.error("索引客户收货地址:" + bulkResponse.buildFailureMessage()); } else { logger.info("索引客户收货地址共" + addressList.size() + "条记录"); } } }); } as Runnable } //groovy最后一个表达式的值为返回 createRunnable(esClient, clientDao, resetClientInfo, keysService, jsonConvert, logger, redisService, dataSourceId, supplierCode)