|
|
@@ -0,0 +1,136 @@
|
|
|
+package com.elab.core.async.pruducer;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.elab.core.async.RealTaskExecutor;
|
|
|
+import com.elab.core.async.SchedulingTaskExecutor;
|
|
|
+import com.elab.core.async.TaskStoreProcess;
|
|
|
+import com.elab.core.async.model.TaskStoreData;
|
|
|
+import com.elab.core.async.store.TaskExecutorDecoration;
|
|
|
+import com.elab.core.async.store.TaskExecutorQueue;
|
|
|
+import com.google.common.collect.Lists;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.ArrayBlockingQueue;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.function.Function;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 实时生产者
|
|
|
+ *
|
|
|
+ * @author : liukx
|
|
|
+ * @time : 2020/8/6 - 11:17
|
|
|
+ */
|
|
|
+public class TaskProducer implements ITaskProducer {
|
|
|
+
|
|
|
+ private Logger logger = LoggerFactory.getLogger(TaskProducer.class);
|
|
|
+
|
|
|
+ private TaskExecutorQueue taskExecutorQueue;
|
|
|
+
|
|
|
+ private TaskStoreProcess handler;
|
|
|
+
|
|
|
+ public TaskProducer(TaskExecutorQueue taskExecutorQueue) {
|
|
|
+ this.taskExecutorQueue = taskExecutorQueue;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setHandler(TaskStoreProcess handler) {
|
|
|
+ this.handler = handler;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void sendRealTimeQueue(RealTaskExecutor taskExecutor) {
|
|
|
+ sendRealTimeQueue(taskExecutor, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void sendRealTimeQueue(RealTaskExecutor taskExecutor, TaskStoreData taskStoreData) {
|
|
|
+ if (taskStoreData != null && taskStoreData.isSave()) {
|
|
|
+ handAdd(taskStoreData);
|
|
|
+ }
|
|
|
+ TaskExecutorDecoration taskExecutorDecoration = new TaskExecutorDecoration(taskStoreData, taskExecutor);
|
|
|
+ boolean isConsumer = this.taskExecutorQueue.getRealTimeQueue().offer(taskExecutorDecoration);
|
|
|
+ if (!isConsumer) {
|
|
|
+ logger.info("队列满了!");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void handAdd(TaskStoreData taskStoreData) {
|
|
|
+ if (this.handler != null) {
|
|
|
+ this.handler.add(taskStoreData);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public ThreadPoolExecutor getExecutor() {
|
|
|
+ return new ThreadPoolExecutor(5, 5,
|
|
|
+ 0L, TimeUnit.MILLISECONDS,
|
|
|
+ new ArrayBlockingQueue<>(20));
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Map<String, Object> reduce(List<?> calculateList, Function<List<?>, Map<String, Object>> function) {
|
|
|
+ ThreadPoolExecutor executor = getExecutor();
|
|
|
+ try {
|
|
|
+ int size = calculateList.size();
|
|
|
+ if (size <= 5) {
|
|
|
+ CompletableFuture<Map<String, Object>> mapCompletableFuture = CompletableFuture.supplyAsync(() -> {
|
|
|
+ return function.apply(calculateList);
|
|
|
+ }, executor);
|
|
|
+ return mapCompletableFuture.get();
|
|
|
+ } else {
|
|
|
+ Map<String, Object> dataMap = new HashMap<>();
|
|
|
+ int batchCount = size / 5;
|
|
|
+
|
|
|
+ if (size % 5 != 0) {
|
|
|
+ batchCount++;
|
|
|
+ }
|
|
|
+
|
|
|
+ List<? extends List<?>> partitionList = Lists.partition(calculateList, 5);
|
|
|
+// CompletableFuture<Map<String, Object>>[] listFuture = new CompletableFuture[batchCount];
|
|
|
+ List<CompletableFuture<Map<String, Object>>> list = new ArrayList<>();
|
|
|
+ CompletableFuture<List<CompletableFuture<Map<String, Object>>>> listCompletableFuture = CompletableFuture.completedFuture(list);
|
|
|
+ for (int i = 0; i < batchCount; i++) {
|
|
|
+ int partitionIndex = i;
|
|
|
+ list.add(CompletableFuture.supplyAsync(() -> {
|
|
|
+ Map<String, Object> apply = function.apply(partitionList.get(partitionIndex));
|
|
|
+ return apply;
|
|
|
+ }, executor));
|
|
|
+ }
|
|
|
+// CompletableFuture<Void> completableFuture = CompletableFuture.allOf(listFuture);
|
|
|
+// completableFuture.get();
|
|
|
+
|
|
|
+ for (int i = 0; i < list.size(); i++) {
|
|
|
+ CompletableFuture<Map<String, Object>> mapCompletableFuture = list.get(i);
|
|
|
+ Map<String, Object> stringObjectMap = mapCompletableFuture.get();
|
|
|
+ System.out.println("-->" + JSON.toJSONString(stringObjectMap));
|
|
|
+ stringObjectMap.forEach((K, V) -> {
|
|
|
+ if (dataMap.get(K) == null) {
|
|
|
+ dataMap.put(K, V);
|
|
|
+ } else {
|
|
|
+ if (V instanceof List) {
|
|
|
+ List mapList = (List) dataMap.get(K);
|
|
|
+ mapList.addAll((Collection) V);
|
|
|
+ dataMap.put(K, mapList);
|
|
|
+ } else {
|
|
|
+ dataMap.put(K, V);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ return dataMap;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void sendSchedulingList(SchedulingTaskExecutor schedulingTaskExecutor) {
|
|
|
+ this.taskExecutorQueue.getSchedulingList().add(schedulingTaskExecutor);
|
|
|
+ }
|
|
|
+}
|