| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- import com.dderp.business.dao.StoreDao
- import com.dderp.common.api.NoSqlKeysService
- import com.dderp.common.datas.ESKeys
- import com.dderp.common.datas.RedisKeys
- import com.dderp.common.entity.store.StoreBrand
- import com.dderp.common.entity.store.StoreInfo
- import com.dderp.common.entity.store.StoreInvoiceInfo
- import com.dderp.common.entity.store.StorePlatform
- import com.dderp.common.entity.store.StorePlatformRequire
- import com.dySweetFishPlugin.elasticsearch.ESClient
- import com.dySweetFishPlugin.redis.RedisService
- import com.sweetfish.convert.json.JsonConvert
- import com.sweetfish.source.PageFlipper
- import com.sweetfish.util.Utility
- import org.apache.logging.log4j.Logger
- import org.elasticsearch.action.bulk.BulkRequestBuilder
- import org.elasticsearch.action.bulk.BulkResponse
- import org.elasticsearch.action.index.IndexRequestBuilder
- import org.elasticsearch.common.xcontent.XContentType
- import org.rex.RMap
- static def createRunnable(ESClient esClient, RedisService redisService, NoSqlKeysService keysService,
- StoreDao storeDao, Logger logger, JsonConvert jsonConvert, String dataSourceId, long supplierCode) {
- return {
- esClient.createIndex(keysService.getESKey(ESKeys.ES_DELIVER_STORE_BRAND_INDEX, supplierCode),
- ESKeys.INDEX_CONFIG,
- Utility.ofMap(ESKeys.ES_DELIVER_STORE_BRAND_TYPE, "storebrand.json"))
- redisService.del(keysService.getRedisKey(RedisKeys.KEY_DELIVER_STORE_BRAND, supplierCode))
- esClient.createIndex(keysService.getESKey(ESKeys.ES_DELIVER_STORE_INFO_INDEX, supplierCode),
- ESKeys.INDEX_CONFIG,
- Utility.ofMap(ESKeys.ES_DELIVER_STORE_INFO_TYPE, "storeinfo.json",
- ESKeys.ES_DELIVER_STORE_INVOICE_INFO_TYPE, "storeinvoiceinfo.json"))
- esClient.createIndex(keysService.getESKey(ESKeys.ES_DELIVER_STORE_PLATFORM_INDEX, supplierCode),
- ESKeys.INDEX_CONFIG,
- Utility.ofMap(ESKeys.ES_DELIVER_STORE_PLATFORM_TYPE, "storeplatform.json",
- ESKeys.ES_DELIVER_STORE_PLATFORM_REQUIRE_TYPE, "storeplatformrequire.json"))
- RMap params = new RMap()
- params.put("voidFlag", -1)
- Ignore_ExecPageFlipper.execute(1000, { PageFlipper p ->
- List<StoreBrand> storeBrandList = storeDao.queryStoreBrandList(params, p, dataSourceId, supplierCode)
- if (!storeBrandList.isEmpty()) {
- Map<String, String> redisMap = storeBrandList.collectEntries { [String.valueOf(it.getId()), jsonConvert.convertTo(it)] }
- if (!redisMap.isEmpty()) {
- redisService.hmset(keysService.getRedisKey(RedisKeys.KEY_DELIVER_STORE_BRAND, supplierCode), redisMap)
- }
- BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk()
- storeBrandList.each { x ->
- IndexRequestBuilder indexRequest = esClient.getClient()
- .prepareIndex(keysService.getESKey(ESKeys.ES_DELIVER_STORE_BRAND_INDEX, supplierCode), ESKeys.ES_DELIVER_STORE_BRAND_TYPE)
- .setId(String.valueOf(x.getId()))
- .setSource(jsonConvert.convertTo(x), XContentType.JSON)
- bulkRequest.add(indexRequest)
- }
- BulkResponse bulkResponse = bulkRequest.execute().actionGet()
- if (bulkResponse.hasFailures()) {
- logger.error("索引门店品牌档案出错:" + bulkResponse.buildFailureMessage())
- } else {
- logger.info("索引门店品牌档案,共" + storeBrandList.size() + "条记录")
- }
- }
- })
- Ignore_ExecPageFlipper.execute(1000, { PageFlipper p ->
- List<StoreInfo> storeInfoList = storeDao.queryStoreInfoList(params, p, dataSourceId, supplierCode)
- if (!storeInfoList.isEmpty()) {
- BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk()
- storeInfoList.each { x ->
- IndexRequestBuilder indexRequest = esClient.getClient()
- .prepareIndex(keysService.getESKey(ESKeys.ES_DELIVER_STORE_INFO_INDEX, supplierCode), ESKeys.ES_DELIVER_STORE_INFO_TYPE)
- .setId(String.valueOf(x.getId()))
- .setSource(jsonConvert.convertTo(x), XContentType.JSON)
- bulkRequest.add(indexRequest)
- }
- BulkResponse bulkResponse = bulkRequest.execute().actionGet()
- if (bulkResponse.hasFailures()) {
- logger.error("索引门店出错:" + bulkResponse.buildFailureMessage())
- } else {
- logger.info("索引门店,共" + storeInfoList.size() + "条记录")
- }
- }
- })
- Ignore_ExecPageFlipper.execute(1000, { PageFlipper p ->
- List<StoreInvoiceInfo> storeInvoiceInfoList = storeDao.queryStoreInvoiceList(params, p, dataSourceId, supplierCode)
- if (!storeInvoiceInfoList.isEmpty()) {
- BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk()
- storeInvoiceInfoList.each { x ->
- IndexRequestBuilder indexRequest = esClient.getClient()
- .prepareIndex(keysService.getESKey(ESKeys.ES_DELIVER_STORE_INFO_INDEX, supplierCode), ESKeys.ES_DELIVER_STORE_INVOICE_INFO_TYPE)
- .setId(String.valueOf(x.getId()))
- .setParent(String.valueOf(x.idStore))
- .setSource(jsonConvert.convertTo(x), XContentType.JSON)
- bulkRequest.add(indexRequest)
- }
- BulkResponse bulkResponse = bulkRequest.execute().actionGet()
- if (bulkResponse.hasFailures()) {
- logger.error("索引门店开票信息出错:" + bulkResponse.buildFailureMessage())
- } else {
- logger.info("索引门店开票信息,共" + storeInvoiceInfoList.size() + "条记录")
- }
- }
- })
- Ignore_ExecPageFlipper.execute(1000, { PageFlipper p ->
- List<StorePlatform> storePlatformList = storeDao.queryStorePlatformList(params, p, dataSourceId, supplierCode)
- if (!storePlatformList.isEmpty()) {
- BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk()
- storePlatformList.each { x ->
- IndexRequestBuilder indexRequest = esClient.getClient()
- .prepareIndex(keysService.getESKey(ESKeys.ES_DELIVER_STORE_PLATFORM_INDEX, supplierCode), ESKeys.ES_DELIVER_STORE_PLATFORM_TYPE)
- .setId(String.valueOf(x.getId()))
- .setSource(jsonConvert.convertTo(x), XContentType.JSON)
- bulkRequest.add(indexRequest)
- }
- BulkResponse bulkResponse = bulkRequest.execute().actionGet()
- if (bulkResponse.hasFailures()) {
- logger.error("索引门店已开通平台出错:" + bulkResponse.buildFailureMessage())
- } else {
- logger.info("索引门店已开通平台,共" + storePlatformList.size() + "条记录")
- }
- }
- })
- Ignore_ExecPageFlipper.execute(1000, { PageFlipper p ->
- List<StorePlatformRequire> storePlatformRequireList = storeDao.queryStorePlatformRequireList(params, p, dataSourceId, supplierCode)
- if (!storePlatformRequireList.isEmpty()) {
- BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk()
- storePlatformRequireList.each { x ->
- IndexRequestBuilder indexRequest = esClient.getClient()
- .prepareIndex(keysService.getESKey(ESKeys.ES_DELIVER_STORE_PLATFORM_INDEX, supplierCode), ESKeys.ES_DELIVER_STORE_PLATFORM_REQUIRE_TYPE)
- .setId(String.valueOf(x.getId()))
- .setParent(String.valueOf(x.idStorePlatform))
- .setSource(jsonConvert.convertTo(x), XContentType.JSON)
- bulkRequest.add(indexRequest)
- }
- BulkResponse bulkResponse = bulkRequest.execute().actionGet()
- if (bulkResponse.hasFailures()) {
- logger.error("索引门店已开通平台需求条件出错:" + bulkResponse.buildFailureMessage())
- } else {
- logger.info("索引门店已开通平台需求条件,共" + storePlatformRequireList.size() + "条记录")
- }
- }
- })
- } as Runnable
- }
- createRunnable(esClient, redisService, keysService, storeDao, logger, jsonConvert, dataSourceId, supplierCode)
|