|
|
@@ -0,0 +1,194 @@
|
|
|
+import com.dderp.common.api.BusinessExecutor
|
|
|
+import com.dderp.common.api.SupplierInitService
|
|
|
+import com.dderp.common.datas.ERPModule
|
|
|
+import com.dderp.common.datas.ESKeys
|
|
|
+import com.dderp.common.entity.base.ProcessStringItem
|
|
|
+import com.dderp.common.entity.order.*
|
|
|
+import com.dySweetFishPlugin.elasticsearch.ESClient
|
|
|
+import com.dySweetFishPlugin.sql.DBService
|
|
|
+import com.sweetfish.convert.json.JsonConvert
|
|
|
+import com.sweetfish.service.RetResult
|
|
|
+import com.sweetfish.source.PageFlipper
|
|
|
+import com.sweetfish.util.Utility
|
|
|
+import org.apache.logging.log4j.LogManager
|
|
|
+import org.apache.logging.log4j.Logger
|
|
|
+import org.elasticsearch.action.bulk.BulkRequestBuilder
|
|
|
+import org.elasticsearch.action.bulk.BulkResponse
|
|
|
+import org.elasticsearch.action.index.IndexRequestBuilder
|
|
|
+import org.elasticsearch.action.support.WriteRequest
|
|
|
+import org.elasticsearch.common.xcontent.XContentType
|
|
|
+
|
|
|
+import javax.annotation.Resource
|
|
|
+
|
|
|
+@SuppressWarnings(["unused"])
|
|
|
+class BE_BusinessOrder_SyncDataBase2ES implements BusinessExecutor<ProcessStringItem, ProcessStringItem> {
|
|
|
+
|
|
|
+ protected final Logger logger = LogManager.getLogger(this.getClass().getSimpleName())
|
|
|
+
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ SupplierInitService supplierService
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ ESClient esClient
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ JsonConvert jsonConvert
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ DBService dbService
|
|
|
+
|
|
|
+ @Override
|
|
|
+ String scriptName() {
|
|
|
+ return "外卖订单-ES同步数据库"
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ ERPModule module() {
|
|
|
+ return ERPModule.DATA_SYNC
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ RetResult<ProcessStringItem> execute(ProcessStringItem source) {
|
|
|
+ String dataSourceId = source.getDataSourceId()
|
|
|
+ long supplierCode = source.getSupplierCode()
|
|
|
+
|
|
|
+ esClient.checkIndexEx(supplierService.getDateYearESIndex(supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 0),
|
|
|
+ ESKeys.INDEX_CONFIG,
|
|
|
+ Utility.ofMap(ESKeys.ES_DELIVER_BUSINESS_ORDER_TYPE, "businessorder.json",
|
|
|
+ ESKeys.ES_DELIVER_ORDER_DELIVERY_INFO_TYPE, "orderdeliveryinfo.json",
|
|
|
+ ESKeys.ES_DELIVER_ORDER_FINANCES_TYPE, "orderfinances.json",
|
|
|
+ ESKeys.ES_DELIVER_ORDER_DETAIL_ITEM_TYPE, "orderdetailitem.json",
|
|
|
+ ESKeys.ES_DELIVER_ORDER_AFTER_SALE_BILL_TYPE, "orderaftersalebill.json",
|
|
|
+ ESKeys.ES_DELIVER_ORDER_RIDER_ITEM_TYPE, "orderrideritem.json"))
|
|
|
+
|
|
|
+ ExecPageFlipper.execute(500, { PageFlipper p ->
|
|
|
+ List<BusinessOrder> dbList = dbService.<BusinessOrder> getList(dataSourceId, "select * from deBusinessOrder${supplierCode}_Current", BusinessOrder.class, p)
|
|
|
+ if (!dbList.isEmpty()) {
|
|
|
+ BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
|
|
|
+
|
|
|
+ dbList.each { order ->
|
|
|
+ IndexRequestBuilder esRequest = esClient.getClient()
|
|
|
+ .prepareIndex(supplierService.getDateYearESIndex(source.supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 1), ESKeys.ES_DELIVER_BUSINESS_ORDER_TYPE)
|
|
|
+ .setId(String.valueOf(order.id))
|
|
|
+ .setSource(source.getConvert("BusinessOrder").convertTo(order), XContentType.JSON)
|
|
|
+ bulkRequest.add(esRequest)
|
|
|
+ }
|
|
|
+
|
|
|
+ BulkResponse bulkResponse = bulkRequest.execute().actionGet()
|
|
|
+
|
|
|
+ if (bulkResponse.hasFailures()) {
|
|
|
+ logger.error("索引订单:" + bulkResponse.buildFailureMessage())
|
|
|
+ } else {
|
|
|
+ logger.info("索引订单共" + dbList.size() + "条记录")
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ })
|
|
|
+
|
|
|
+ ExecPageFlipper.execute(500, { PageFlipper p ->
|
|
|
+ List<OrderDetailItem> dbList = dbService.<OrderDetailItem> getList(dataSourceId, "select * from deOrderDetailItem${supplierCode}_Current", OrderDetailItem.class, p)
|
|
|
+ if (!dbList.isEmpty()) {
|
|
|
+ BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
|
|
|
+
|
|
|
+ dbList.each { info ->
|
|
|
+ IndexRequestBuilder esRequest = esClient.getClient()
|
|
|
+ .prepareIndex(supplierService.getDateYearESIndex(source.supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 1), ESKeys.ES_DELIVER_ORDER_DETAIL_ITEM_TYPE)
|
|
|
+ .setParent(String.valueOf(info.idOrder))
|
|
|
+ .setId(String.valueOf(info.id))
|
|
|
+ .setSource(jsonConvert.convertTo(info), XContentType.JSON)
|
|
|
+ bulkRequest.add(esRequest)
|
|
|
+ }
|
|
|
+
|
|
|
+ BulkResponse bulkResponse = bulkRequest.execute().actionGet()
|
|
|
+
|
|
|
+ if (bulkResponse.hasFailures()) {
|
|
|
+ logger.error("索引订单detailItem:" + bulkResponse.buildFailureMessage())
|
|
|
+ } else {
|
|
|
+ logger.info("索引订单共detailItem" + dbList.size() + "条记录")
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ })
|
|
|
+
|
|
|
+ ExecPageFlipper.execute(500, { PageFlipper p ->
|
|
|
+ List<OrderDeliveryInfo> dbList = dbService.<OrderDeliveryInfo> getList(dataSourceId, "select * from deOrderDeliveryInfo${supplierCode}_Current", OrderDeliveryInfo.class, p)
|
|
|
+ if (!dbList.isEmpty()) {
|
|
|
+ BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
|
|
|
+
|
|
|
+ dbList.each { info ->
|
|
|
+ IndexRequestBuilder esRequest = esClient.getClient()
|
|
|
+ .prepareIndex(supplierService.getDateYearESIndex(source.supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 1), ESKeys.ES_DELIVER_ORDER_DELIVERY_INFO_TYPE)
|
|
|
+ .setParent(String.valueOf(info.idOrder))
|
|
|
+ .setId(String.valueOf(info.id))
|
|
|
+ .setSource(jsonConvert.convertTo(info), XContentType.JSON)
|
|
|
+ bulkRequest.add(esRequest)
|
|
|
+ }
|
|
|
+
|
|
|
+ BulkResponse bulkResponse = bulkRequest.execute().actionGet()
|
|
|
+
|
|
|
+ if (bulkResponse.hasFailures()) {
|
|
|
+ logger.error("索引订单DeliveryInfo:" + bulkResponse.buildFailureMessage())
|
|
|
+ } else {
|
|
|
+ logger.info("索引订单共DeliveryInfo" + dbList.size() + "条记录")
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ })
|
|
|
+
|
|
|
+ ExecPageFlipper.execute(500, { PageFlipper p ->
|
|
|
+ List<OrderFinances> dbList = dbService.<OrderFinances> getList(dataSourceId, "select * from deOrderFinances${supplierCode}_Current", OrderFinances.class, p)
|
|
|
+ if (!dbList.isEmpty()) {
|
|
|
+ BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
|
|
|
+
|
|
|
+ dbList.each { info ->
|
|
|
+ IndexRequestBuilder esRequest = esClient.getClient()
|
|
|
+ .prepareIndex(supplierService.getDateYearESIndex(source.supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 1), ESKeys.ES_DELIVER_ORDER_FINANCES_TYPE)
|
|
|
+ .setParent(String.valueOf(info.idOrder))
|
|
|
+ .setId(String.valueOf(info.id))
|
|
|
+ .setSource(jsonConvert.convertTo(info), XContentType.JSON)
|
|
|
+ bulkRequest.add(esRequest)
|
|
|
+ }
|
|
|
+
|
|
|
+ BulkResponse bulkResponse = bulkRequest.execute().actionGet()
|
|
|
+
|
|
|
+ if (bulkResponse.hasFailures()) {
|
|
|
+ logger.error("索引订单OrderFinances:" + bulkResponse.buildFailureMessage())
|
|
|
+ } else {
|
|
|
+ logger.info("索引订单共OrderFinances" + dbList.size() + "条记录")
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ })
|
|
|
+
|
|
|
+ ExecPageFlipper.execute(500, { PageFlipper p ->
|
|
|
+ List<OrderRiderItem> dbList = dbService.<OrderRiderItem> getList(dataSourceId, "select * from deOrderRiderItem${supplierCode}_Current", OrderRiderItem.class, p)
|
|
|
+ if (!dbList.isEmpty()) {
|
|
|
+ BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
|
|
|
+
|
|
|
+ dbList.each { info ->
|
|
|
+ IndexRequestBuilder esRequest = esClient.getClient()
|
|
|
+ .prepareIndex(supplierService.getDateYearESIndex(source.supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 1), ESKeys.ES_DELIVER_ORDER_RIDER_ITEM_TYPE)
|
|
|
+ .setParent(String.valueOf(info.idOrder))
|
|
|
+ .setId(String.valueOf(info.id))
|
|
|
+ .setSource(jsonConvert.convertTo(info), XContentType.JSON)
|
|
|
+ bulkRequest.add(esRequest)
|
|
|
+ }
|
|
|
+
|
|
|
+ BulkResponse bulkResponse = bulkRequest.execute().actionGet()
|
|
|
+
|
|
|
+ if (bulkResponse.hasFailures()) {
|
|
|
+ logger.error("索引订单OrderRiderItem:" + bulkResponse.buildFailureMessage())
|
|
|
+ } else {
|
|
|
+ logger.info("索引订单共OrderRiderItem" + dbList.size() + "条记录")
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ })
|
|
|
+
|
|
|
+
|
|
|
+ return RetResult.successT()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+
|