import com.yinjie.heating.common.api.NoSqlKeysService import com.yinjie.heating.common.datas.ESKeys import com.dySweetFishPlugin.elasticsearch.ESClient import org.apache.logging.log4j.Logger import org.elasticsearch.action.ActionListener import org.elasticsearch.index.query.BoolQueryBuilder import org.elasticsearch.index.query.QueryBuilders import org.elasticsearch.index.reindex.BulkByScrollResponse import org.elasticsearch.index.reindex.UpdateByQueryAction import org.elasticsearch.script.Script import org.elasticsearch.script.ScriptType //系统群发消息过期, 系统启动后1分钟后执行一遍标记过期操作,然后每隔5分钟执行一次过期操作 static def createRunnable(ESClient esClient, NoSqlKeysService keysService, Logger logger, long supplierCode) { return { //只有系统群发消息才有过期设置,点对点发的无此功能 long current = (new Date()).getTime() String index = keysService.getESKey(ESKeys.ESERPSYSMSG_INDEX, supplierCode) BoolQueryBuilder qb = QueryBuilders.boolQuery() qb = qb.must(QueryBuilders.rangeQuery("expireTime").gte(current)) .must(QueryBuilders.termQuery("expireFlag", 0)) .must(QueryBuilders.typeQuery(ESKeys.ESERPDEFAULT_TYPE)) //批量更新 UpdateByQueryAction.INSTANCE.newRequestBuilder(esClient.getClient()) .refresh(true) .abortOnVersionConflict(false) .source(index) .script(new Script( ScriptType.INLINE, "painless", "ctx._source.expireFlag = 1", Collections.emptyMap() )) .filter(qb) .execute(new ActionListener() { //回调监听 @Override public void onResponse(BulkByScrollResponse response) { if (response.getUpdated() > 0L) { logger.info("修改过期:" + response.getUpdated()) } } @Override public void onFailure(Exception e) { // Handle the exception logger.error(e.getMessage(), e) } }) } as Runnable } //groovy最后一个表达式的值为返回 createRunnable(esClient, keysService, logger, supplierCode)