|
|
@@ -0,0 +1,184 @@
|
|
|
+package com.dderp.business.service.flycat;
|
|
|
+
|
|
|
+import com.dderp.common.api.NoSqlKeysService;
|
|
|
+import com.dderp.common.api.PlatformService;
|
|
|
+import com.dderp.common.api.flycat.ExpressGeoService;
|
|
|
+import com.dderp.common.api.flycat.ExpressOutService;
|
|
|
+import com.dderp.common.base.BaseService;
|
|
|
+import com.dderp.common.datas.RedisKeys;
|
|
|
+import com.dderp.common.entity.base.InvokeCallParams;
|
|
|
+import com.dderp.common.entity.base.InvokeCallResult;
|
|
|
+import com.dderp.common.entity.geo.RiderGeoInfo;
|
|
|
+import com.dderp.common.entity.platform.PlatformInfo;
|
|
|
+import com.dderp.common.tool.ERPUtils;
|
|
|
+import com.dySweetFishPlugin.redis.RedisService;
|
|
|
+import com.dySweetFishPlugin.tool.lang.ThreadFactoryWithNamePrefix;
|
|
|
+import com.sweetfish.convert.json.JsonConvert;
|
|
|
+import com.sweetfish.service.Local;
|
|
|
+import com.sweetfish.service.RetResult;
|
|
|
+import com.sweetfish.util.AnyValue;
|
|
|
+import com.sweetfish.util.AutoLoad;
|
|
|
+import com.sweetfish.util.ResourceType;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.concurrent.*;
|
|
|
+import java.util.stream.IntStream;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 配送平台坐标服务
|
|
|
+ */
|
|
|
+@AutoLoad(false)
|
|
|
+@Local
|
|
|
+@ResourceType(ExpressGeoService.class)
|
|
|
+public class ExpressGeoServiceImpl extends BaseService implements ExpressGeoService {
|
|
|
+
|
|
|
+ @Resource(name = "property.riderGeoShards")
|
|
|
+ private int riderGeoShards;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ RedisService redisService;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ NoSqlKeysService keysService;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ JsonConvert jsonConvert;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ PlatformService platformService;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ ExpressOutService expressOutService;
|
|
|
+
|
|
|
+ private final List<ScheduledThreadPoolExecutor> scheduleScanThreadList = new ArrayList<>();
|
|
|
+
|
|
|
+ private ExecutorService riderWorkExecutor;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void init(AnyValue config) {
|
|
|
+ super.init(config);
|
|
|
+
|
|
|
+ riderWorkExecutor = new ThreadPoolExecutor(
|
|
|
+ 8, //corePoolSize 线程池维护线程的最少数量
|
|
|
+ 8, // maxPoolSize 线程池维护线程的最大数量
|
|
|
+ 60, //keepAliveSeconds 线程池维护线程所允许的空闲时间
|
|
|
+ TimeUnit.SECONDS,
|
|
|
+ new LinkedBlockingDeque<>(Integer.MAX_VALUE),
|
|
|
+ new ThreadFactoryWithNamePrefix("[骑手坐标上传线程池]"));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void destroy(AnyValue config) {
|
|
|
+ super.destroy(config);
|
|
|
+
|
|
|
+ scheduleScanThreadList.forEach(ScheduledThreadPoolExecutor::shutdown);
|
|
|
+
|
|
|
+ if (riderWorkExecutor != null) {
|
|
|
+ riderWorkExecutor.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 骑手坐标订单平台需要上传,参考
|
|
|
+ * <a href="https://developer.open-douyin.com/docs/resource/zh-CN/local-life/develop/OpenAPI/takeout/takeout_order/takeout_order_distribution_sync">...</a>
|
|
|
+ * 工作原理
|
|
|
+ * 1、出于订单数据和实时性考量,在订单 骑手已取货 状态 开始后,将订单信息写入到redis的hash中,redis的key由订单id % 4,
|
|
|
+ * 2、hash中的key为订单id,值为 入口平台id,配送平台id,lat,lng
|
|
|
+ * 3、四个线程每个30秒扫各自的hash表,循环所有订单,扔到另一个线程池中处理
|
|
|
+ * 4、在骑手完成后,找到对应的订单,从redis中删除即可
|
|
|
+ *
|
|
|
+ * @param config 参数
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void start(AnyValue config) {
|
|
|
+ super.start(config);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void initScanThread(String dataSourceId, long supplierCode) {
|
|
|
+ IntStream.rangeClosed(0, riderGeoShards).forEach((x) -> {
|
|
|
+ ScheduledThreadPoolExecutor scanThread = new ScheduledThreadPoolExecutor(1, new ThreadFactoryWithNamePrefix("[骑手坐标列表轮询线程_" + x + "_" + supplierCode + "]"));
|
|
|
+ scanThread.scheduleWithFixedDelay(() -> {
|
|
|
+ Map<String, String> orderMap = redisService.hgetAll(keysService.getRedisKey(RedisKeys.KEY_ERP_ORDER_RIDER_GEO, supplierCode, true) + x);
|
|
|
+
|
|
|
+ orderMap.values().forEach((v) -> CompletableFuture.runAsync(() -> {
|
|
|
+ RiderGeoInfo geoInfo = jsonConvert.convertFromO(RiderGeoInfo.class, v);
|
|
|
+ String oldLat = geoInfo.getRiderLat();
|
|
|
+ String oldLng = geoInfo.getRiderLng();
|
|
|
+ PlatformInfo outPlatform = platformService.getPlatformInfo(geoInfo.getOutPlatformId(), false, supplierCode);
|
|
|
+ if (outPlatform == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ PlatformInfo inPlatform = platformService.getPlatformInfo(geoInfo.getInPlatformId(), false, supplierCode);
|
|
|
+ if (inPlatform == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ InvokeCallParams geoCallParams = InvokeCallParams.newBuilder()
|
|
|
+ .businessMethod("Express_RiderLocation_" + outPlatform.getPlatformCode())
|
|
|
+ .params(jsonConvert.convertTo(geoInfo))
|
|
|
+ .build();
|
|
|
+ //Express_RiderLocation接口要求给平台脚本返回的json字符串类型要统一,统一为RiderGeoInfo对象字符串即可
|
|
|
+ RetResult<InvokeCallResult> geoResult = expressOutService.callExpress(geoCallParams, ERPUtils.getSysTokenUser(), dataSourceId, supplierCode);
|
|
|
+ if (geoResult.isSuccess()) {
|
|
|
+ //todo 判断位置相同,其实在脚本里面判断应该就可以了,免得两次json操作
|
|
|
+ RiderGeoInfo newGeoInfo = jsonConvert.convertFromO(RiderGeoInfo.class, geoResult.getResult().getData());
|
|
|
+ if ((!StringUtils.equalsIgnoreCase(oldLat, newGeoInfo.getRiderLat())) || (!StringUtils.equalsIgnoreCase(oldLng, newGeoInfo.getRiderLng()))) {
|
|
|
+ //上传订单平台
|
|
|
+ InvokeCallParams syncCallParams = InvokeCallParams.newBuilder()
|
|
|
+ .businessMethod("Order_Sync_RiderLocation" + inPlatform.getPlatformCode())
|
|
|
+ .params(geoResult.getResult().getData())
|
|
|
+ .build();
|
|
|
+ RetResult<InvokeCallResult> syncResult = expressOutService.callExpress(syncCallParams, ERPUtils.getSysTokenUser(), dataSourceId, supplierCode);
|
|
|
+ if (!syncResult.isSuccess()) {
|
|
|
+ logger.error("订单同步骑手位置出错:" + inPlatform.getPlatformName() + " " + syncResult.getRetinfo());
|
|
|
+ } else {
|
|
|
+ //反写回redis
|
|
|
+ addOrderGeoInfo(newGeoInfo, supplierCode);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ }, riderWorkExecutor));
|
|
|
+
|
|
|
+
|
|
|
+ }, 30, 40, TimeUnit.SECONDS);
|
|
|
+ scheduleScanThreadList.add(scanThread);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取物料对应的订单redis键,分片存储
|
|
|
+ *
|
|
|
+ * @param idOrder 订单id
|
|
|
+ * @param supplierCode 分表
|
|
|
+ * @return redis key
|
|
|
+ */
|
|
|
+ private String getRedisOrderGeoKey(long idOrder, long supplierCode) {
|
|
|
+ return keysService.getRedisKey(RedisKeys.KEY_ERP_ORDER_RIDER_GEO, supplierCode, true) + (idOrder % riderGeoShards);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 增加订单扫描位置信息
|
|
|
+ *
|
|
|
+ * @param riderGeoInfo 订单信息
|
|
|
+ * @param supplierCode 分表
|
|
|
+ */
|
|
|
+ public void addOrderGeoInfo(RiderGeoInfo riderGeoInfo, long supplierCode) {
|
|
|
+ String key = getRedisOrderGeoKey(riderGeoInfo.getIdOrder(), supplierCode);
|
|
|
+ redisService.hset(key, String.valueOf(riderGeoInfo.getIdOrder()), jsonConvert.convertTo(riderGeoInfo));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 删除订单扫描位置信息
|
|
|
+ *
|
|
|
+ * @param idOrder 订单id
|
|
|
+ * @param supplierCode 分表
|
|
|
+ */
|
|
|
+ public void delOrderGeoInfo(long idOrder, long supplierCode) {
|
|
|
+ String key = getRedisOrderGeoKey(idOrder, supplierCode);
|
|
|
+ redisService.hdel(key, String.valueOf(idOrder));
|
|
|
+ }
|
|
|
+}
|