import com.dderp.business.dao.PaperDao import com.dderp.common.api.NoSqlKeysService import com.dderp.common.datas.ESKeys import com.dderp.common.datas.RedisKeys import com.dderp.common.entity.paper.PaperBaseInfo import com.dderp.common.entity.paper.PaperBrand import com.dderp.common.entity.paper.PaperCutInfo import com.dderp.common.entity.paper.PaperCutSizeInfo import com.dderp.common.entity.paper.PaperInfo import com.dderp.common.entity.paper.PaperInfoType import com.dderp.common.entity.paper.PaperQuoteInfo import com.dySweetFishPlugin.elasticsearch.ESClient import com.dySweetFishPlugin.redis.RedisService import com.dySweetFishPlugin.sql.TableCodeNode import com.dySweetFishPlugin.sql.TableIdService import com.sweetfish.convert.json.JsonConvert import com.sweetfish.source.PageFlipper import com.sweetfish.util.Utility import org.apache.commons.lang3.StringUtils import org.apache.logging.log4j.Logger import org.elasticsearch.action.bulk.BulkRequestBuilder import org.elasticsearch.action.bulk.BulkResponse import org.elasticsearch.action.index.IndexRequestBuilder import org.elasticsearch.common.xcontent.XContentType import org.rex.RMap static def createRunnable(ESClient esClient, NoSqlKeysService keysService, TableIdService tableIdService, PaperDao paperDao, JsonConvert jsonConvert, Logger logger, RedisService redisService, String dataSourceId, long supplierCode) { return { RMap params = new RMap() //纸张品牌档案 redisService.del(keysService.getRedisKey(RedisKeys.KEY_ERP_PAPER_BRAND, supplierCode)) Ignore_ExecPageFlipper.execute(1000, { PageFlipper p -> List brandList = paperDao.selectPaperBrand(params, p, dataSourceId, supplierCode) if (!brandList.isEmpty()) { Map brandMap = brandList.collectEntries { [String.valueOf(it.getId()), jsonConvert.convertTo(it)] } redisService.hmset(keysService.getRedisKey(RedisKeys.KEY_ERP_PAPER_BRAND, supplierCode), brandMap) } }) //纸张开纸档案 redisService.del(keysService.getRedisKey(RedisKeys.KEY_ERP_PAPER_CUT_INFO, supplierCode)) List cutInfoList = paperDao.queryPaperCutInfo(params, dataSourceId, supplierCode) if (!cutInfoList.isEmpty()) { RMap subParams = new RMap() cutInfoList.forEach(x -> { subParams.set("idCutInfo", x.getId()) List sizeInfoList = paperDao.queryPaperCutSizeInfo(subParams, dataSourceId, supplierCode) x.setSizeInfoList(sizeInfoList) }) Map cutInfoMap = cutInfoList.collectEntries { [String.valueOf(it.getId()), jsonConvert.convertTo(it)] } redisService.hmset(keysService.getRedisKey(RedisKeys.KEY_ERP_PAPER_CUT_INFO, supplierCode), cutInfoMap) } //纸张类型档案 esClient.createIndex(keysService.getESKey(ESKeys.ES_ERP_PAPERTYPE_INDEX, supplierCode), ESKeys.INDEX_CONFIG, Utility.ofMap(ESKeys.ES_ERP_PAPERTYPE_TYPE, "papertype.json")) Ignore_ExecPageFlipper.execute(500, { PageFlipper p -> List paperInfoTypeList = paperDao.selectPaperInfoType(params, p, dataSourceId, supplierCode) if (!paperInfoTypeList.isEmpty()) { BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk() paperInfoTypeList.forEach((x) -> { IndexRequestBuilder indexRequest = esClient.getClient() .prepareIndex(keysService.getESKey(ESKeys.ES_ERP_PAPERTYPE_INDEX, supplierCode), ESKeys.ES_ERP_PAPERTYPE_TYPE) .setId(String.valueOf(x.getId())) .setSource(jsonConvert.convertTo(x), XContentType.JSON) bulkRequest.add(indexRequest) if (StringUtils.isNotBlank(x.getPaperTypeCode())) { tableIdService.addTableCodeNode(TableCodeNode.newBuilder() .codeName("tbPaperInfo.code." + x.getPaperTypeCode().trim()) .tableName("tbPaperInfo") .codeFieldName("paperCode") .codePrefix(x.getPaperTypeCode()) .codeYear(false) .yearLength(4) .codeMonth(false) .codeDay(false) .codeLength(4) .build(), true, dataSourceId, String.valueOf(supplierCode)) } }) BulkResponse bulkResponse = bulkRequest.execute().actionGet() if (bulkResponse.hasFailures()) { logger.error("索引纸张分类出错:" + bulkResponse.buildFailureMessage()) } else { logger.info("索引纸张分类,共" + paperInfoTypeList.size() + "条记录") } } }) //基础纸张档案 esClient.createIndex(keysService.getESKey(ESKeys.ES_ERP_PAPERBASEINFO_INDEX, supplierCode), ESKeys.INDEX_CONFIG, Utility.ofMap(ESKeys.ES_ERP_PAPERBASEINFO_TYPE, "paperbaseinfo.json")) Ignore_ExecPageFlipper.execute(500, { PageFlipper p -> List paperBaseInfoList = paperDao.selectPaperBaseInfo(params, p, dataSourceId, supplierCode) if (!paperBaseInfoList.isEmpty()) { BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk() paperBaseInfoList.forEach((x) -> { IndexRequestBuilder indexRequest = esClient.getClient() .prepareIndex(keysService.getESKey(ESKeys.ES_ERP_PAPERBASEINFO_INDEX, supplierCode), ESKeys.ES_ERP_PAPERBASEINFO_TYPE) .setId(String.valueOf(x.getId())) .setSource(jsonConvert.convertTo(x), XContentType.JSON) bulkRequest.add(indexRequest) }) BulkResponse bulkResponse = bulkRequest.execute().actionGet() if (bulkResponse.hasFailures()) { logger.error("索引纸张档案出错:" + bulkResponse.buildFailureMessage()) } else { logger.info("索引纸张档案,共" + paperBaseInfoList.size() + "条记录") } } }) //纸张报价档案 esClient.createIndex(keysService.getESKey(ESKeys.ES_ERP_PAPERQUOTEINFO_INDEX, supplierCode), ESKeys.INDEX_CONFIG, Utility.ofMap(ESKeys.ES_ERP_PAPERQUOTEINFO_TYPE, "paperquoteinfo.json")) Ignore_ExecPageFlipper.execute(500, { PageFlipper p -> List paperQuoteInfos = paperDao.selectPaperQuoteInfo(params, p, dataSourceId, supplierCode) if (!paperQuoteInfos.isEmpty()) { BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk() paperQuoteInfos.forEach((x) -> { IndexRequestBuilder indexRequest = esClient.getClient() .prepareIndex(keysService.getESKey(ESKeys.ES_ERP_PAPERQUOTEINFO_INDEX, supplierCode), ESKeys.ES_ERP_PAPERQUOTEINFO_TYPE) .setId(String.valueOf(x.getId())) .setSource(jsonConvert.convertTo(x), XContentType.JSON) bulkRequest.add(indexRequest) }) BulkResponse bulkResponse = bulkRequest.execute().actionGet() if (bulkResponse.hasFailures()) { logger.error("索引纸张报价档案出错:" + bulkResponse.buildFailureMessage()) } else { logger.info("索引纸张报价档案,共" + paperQuoteInfos.size() + "条记录") } } }) //纸张档案 esClient.createIndex(keysService.getESKey(ESKeys.ES_ERP_PAPERINFO_INDEX, supplierCode), ESKeys.INDEX_CONFIG, Utility.ofMap(ESKeys.ES_ERP_PAPERINFO_TYPE, "paperinfo.json")) Ignore_ExecPageFlipper.execute(500, { PageFlipper p -> List paperInfoList = paperDao.selectPaperInfo(params, p, dataSourceId, supplierCode) if (!paperInfoList.isEmpty()) { BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk() paperInfoList.forEach((x) -> { IndexRequestBuilder indexRequest = esClient.getClient() .prepareIndex(keysService.getESKey(ESKeys.ES_ERP_PAPERINFO_INDEX, supplierCode), ESKeys.ES_ERP_PAPERINFO_TYPE) .setId(String.valueOf(x.getId())) .setSource(jsonConvert.convertTo(x), XContentType.JSON) bulkRequest.add(indexRequest) }) BulkResponse bulkResponse = bulkRequest.execute().actionGet() if (bulkResponse.hasFailures()) { logger.error("索引纸张档案出错:" + bulkResponse.buildFailureMessage()) } else { logger.info("索引纸张档案,共" + paperInfoList.size() + "条记录") } } }) } as Runnable } //groovy最后一个表达式的值为返回 createRunnable(esClient, keysService, tableIdService, paperDao, jsonConvert, logger, redisService, dataSourceId, supplierCode)