import com.dderp.business.dao.StoreDao import com.dderp.common.api.NoSqlKeysService import com.dderp.common.datas.ESKeys import com.dderp.common.datas.RedisKeys import com.dderp.common.entity.store.StoreBrand import com.dderp.common.entity.store.StoreInfo import com.dderp.common.entity.store.StoreInvoiceInfo import com.dderp.common.entity.store.StorePlatform import com.dderp.common.entity.store.StorePlatformRequire import com.dySweetFishPlugin.elasticsearch.ESClient import com.dySweetFishPlugin.redis.RedisService import com.sweetfish.convert.json.JsonConvert import com.sweetfish.source.PageFlipper import com.sweetfish.util.Utility import org.apache.logging.log4j.Logger import org.elasticsearch.action.bulk.BulkRequestBuilder import org.elasticsearch.action.bulk.BulkResponse import org.elasticsearch.action.index.IndexRequestBuilder import org.elasticsearch.common.xcontent.XContentType import org.rex.RMap static def createRunnable(ESClient esClient, RedisService redisService, NoSqlKeysService keysService, StoreDao storeDao, Logger logger, JsonConvert jsonConvert, String dataSourceId, long supplierCode) { return { esClient.createIndex(keysService.getESKey(ESKeys.ES_DELIVER_STORE_BRAND_INDEX, supplierCode), ESKeys.INDEX_CONFIG, Utility.ofMap(ESKeys.ES_DELIVER_STORE_BRAND_TYPE, "storebrand.json")) redisService.del(keysService.getRedisKey(RedisKeys.KEY_DELIVER_STORE_BRAND, supplierCode)) esClient.createIndex(keysService.getESKey(ESKeys.ES_DELIVER_STORE_INFO_INDEX, supplierCode), ESKeys.INDEX_CONFIG, Utility.ofMap(ESKeys.ES_DELIVER_STORE_INFO_TYPE, "storeinfo.json", ESKeys.ES_DELIVER_STORE_INVOICE_INFO_TYPE, "storeinvoiceinfo.json")) esClient.createIndex(keysService.getESKey(ESKeys.ES_DELIVER_STORE_PLATFORM_INDEX, supplierCode), ESKeys.INDEX_CONFIG, Utility.ofMap(ESKeys.ES_DELIVER_STORE_PLATFORM_TYPE, "storeplatform.json", ESKeys.ES_DELIVER_STORE_PLATFORM_REQUIRE_TYPE, "storeplatformrequire.json")) RMap params = new RMap() params.put("voidFlag", -1) Ignore_ExecPageFlipper.execute(1000, { PageFlipper p -> List storeBrandList = storeDao.queryStoreBrandList(params, p, dataSourceId, supplierCode) if (!storeBrandList.isEmpty()) { Map redisMap = storeBrandList.collectEntries { [String.valueOf(it.getId()), jsonConvert.convertTo(it)] } if (!redisMap.isEmpty()) { redisService.hmset(keysService.getRedisKey(RedisKeys.KEY_DELIVER_STORE_BRAND, supplierCode), redisMap) } BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk() storeBrandList.each { x -> IndexRequestBuilder indexRequest = esClient.getClient() .prepareIndex(keysService.getESKey(ESKeys.ES_DELIVER_STORE_BRAND_INDEX, supplierCode), ESKeys.ES_DELIVER_STORE_BRAND_TYPE) .setId(String.valueOf(x.getId())) .setSource(jsonConvert.convertTo(x), XContentType.JSON) bulkRequest.add(indexRequest) } BulkResponse bulkResponse = bulkRequest.execute().actionGet() if (bulkResponse.hasFailures()) { logger.error("索引门店品牌档案出错:" + bulkResponse.buildFailureMessage()) } else { logger.info("索引门店品牌档案,共" + storeBrandList.size() + "条记录") } } }) Ignore_ExecPageFlipper.execute(1000, { PageFlipper p -> List storeInfoList = storeDao.queryStoreInfoList(params, p, dataSourceId, supplierCode) if (!storeInfoList.isEmpty()) { BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk() storeInfoList.each { x -> IndexRequestBuilder indexRequest = esClient.getClient() .prepareIndex(keysService.getESKey(ESKeys.ES_DELIVER_STORE_INFO_INDEX, supplierCode), ESKeys.ES_DELIVER_STORE_INFO_TYPE) .setId(String.valueOf(x.getId())) .setSource(jsonConvert.convertTo(x), XContentType.JSON) bulkRequest.add(indexRequest) } BulkResponse bulkResponse = bulkRequest.execute().actionGet() if (bulkResponse.hasFailures()) { logger.error("索引门店出错:" + bulkResponse.buildFailureMessage()) } else { logger.info("索引门店,共" + storeInfoList.size() + "条记录") } } }) Ignore_ExecPageFlipper.execute(1000, { PageFlipper p -> List storeInvoiceInfoList = storeDao.queryStoreInvoiceList(params, p, dataSourceId, supplierCode) if (!storeInvoiceInfoList.isEmpty()) { BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk() storeInvoiceInfoList.each { x -> IndexRequestBuilder indexRequest = esClient.getClient() .prepareIndex(keysService.getESKey(ESKeys.ES_DELIVER_STORE_INFO_INDEX, supplierCode), ESKeys.ES_DELIVER_STORE_INVOICE_INFO_TYPE) .setId(String.valueOf(x.getId())) .setParent(String.valueOf(x.idStore)) .setSource(jsonConvert.convertTo(x), XContentType.JSON) bulkRequest.add(indexRequest) } BulkResponse bulkResponse = bulkRequest.execute().actionGet() if (bulkResponse.hasFailures()) { logger.error("索引门店开票信息出错:" + bulkResponse.buildFailureMessage()) } else { logger.info("索引门店开票信息,共" + storeInvoiceInfoList.size() + "条记录") } } }) Ignore_ExecPageFlipper.execute(1000, { PageFlipper p -> List storePlatformList = storeDao.queryStorePlatformList(params, p, dataSourceId, supplierCode) if (!storePlatformList.isEmpty()) { BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk() storePlatformList.each { x -> IndexRequestBuilder indexRequest = esClient.getClient() .prepareIndex(keysService.getESKey(ESKeys.ES_DELIVER_STORE_PLATFORM_INDEX, supplierCode), ESKeys.ES_DELIVER_STORE_PLATFORM_TYPE) .setId(String.valueOf(x.getId())) .setSource(jsonConvert.convertTo(x), XContentType.JSON) bulkRequest.add(indexRequest) } BulkResponse bulkResponse = bulkRequest.execute().actionGet() if (bulkResponse.hasFailures()) { logger.error("索引门店已开通平台出错:" + bulkResponse.buildFailureMessage()) } else { logger.info("索引门店已开通平台,共" + storePlatformList.size() + "条记录") } } }) Ignore_ExecPageFlipper.execute(1000, { PageFlipper p -> List storePlatformRequireList = storeDao.queryStorePlatformRequireList(params, p, dataSourceId, supplierCode) if (!storePlatformRequireList.isEmpty()) { BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk() storePlatformRequireList.each { x -> IndexRequestBuilder indexRequest = esClient.getClient() .prepareIndex(keysService.getESKey(ESKeys.ES_DELIVER_STORE_PLATFORM_INDEX, supplierCode), ESKeys.ES_DELIVER_STORE_PLATFORM_REQUIRE_TYPE) .setId(String.valueOf(x.getId())) .setParent(String.valueOf(x.idStorePlatform)) .setSource(jsonConvert.convertTo(x), XContentType.JSON) bulkRequest.add(indexRequest) } BulkResponse bulkResponse = bulkRequest.execute().actionGet() if (bulkResponse.hasFailures()) { logger.error("索引门店已开通平台需求条件出错:" + bulkResponse.buildFailureMessage()) } else { logger.info("索引门店已开通平台需求条件,共" + storePlatformRequireList.size() + "条记录") } } }) } as Runnable } createRunnable(esClient, redisService, keysService, storeDao, logger, jsonConvert, dataSourceId, supplierCode)