Selaa lähdekoodia

多线程工具类,websocket

18792927508 2 vuotta sitten
vanhempi
commit
dbefa0b24e

+ 4
- 1
ruoyi-common/pom.xml Näytä tiedosto

@@ -184,7 +184,10 @@
184 184
             <artifactId>spring-boot-starter-mail</artifactId>
185 185
             <version>3.1.4</version>
186 186
         </dependency>
187
-
187
+        <dependency>
188
+            <groupId>org.springframework.boot</groupId>
189
+            <artifactId>spring-boot-starter-websocket</artifactId>
190
+        </dependency>
188 191
         <dependency>
189 192
             <groupId>javax.activation</groupId>
190 193
             <artifactId>activation</artifactId>

+ 13
- 0
ruoyi-common/src/main/java/com/ruoyi/common/annotation/VoidFunction.java Näytä tiedosto

@@ -0,0 +1,13 @@
1
+package com.ruoyi.common.annotation;
2
+/**
3
+ * 一个参数、没有返回
4
+ * @author wangqiong
5
+ */
6
+@FunctionalInterface
7
+public interface VoidFunction<T> {
8
+    /**
9
+     * 有一个参数
10
+     * @param param
11
+     */
12
+    void apply(T param);
13
+}

+ 1
- 0
ruoyi-common/src/main/java/com/ruoyi/common/constant/Constants.java Näytä tiedosto

@@ -129,6 +129,7 @@ public class Constants
129 129
      */
130 130
     public static final String LOOKUP_LDAPS = "ldaps:";
131 131
     public static final String DEFAULT_PASSWORD = "123456";
132
+    public static final String SPLIT_COMMA =",";
132 133
 
133 134
     /**
134 135
      * 自动识别json对象白名单配置(仅允许解析的包名,范围越小越安全)

+ 22
- 0
ruoyi-common/src/main/java/com/ruoyi/common/utils/thread/MultipleThreadListParam.java Näytä tiedosto

@@ -0,0 +1,22 @@
1
+package com.ruoyi.common.utils.thread;
2
+
3
+import lombok.Data;
4
+
5
+import java.util.List;
6
+import java.util.function.Function;
7
+
8
+/**
9
+ * @author wangqiong
10
+ * @date 2023/11/25 9:25
11
+ **/
12
+@Data
13
+public class MultipleThreadListParam<T,R> {
14
+    /**需要执行的方法*/
15
+    private Function<List<T>,R> function;
16
+    /**参数ID*/
17
+    private List<T> list;
18
+    public MultipleThreadListParam(Function<List<T>,R> function, List<T> list){
19
+        this.function=function;
20
+        this.list=list;
21
+    }
22
+}

+ 21
- 0
ruoyi-common/src/main/java/com/ruoyi/common/utils/thread/MultipleThreadStringParam.java Näytä tiedosto

@@ -0,0 +1,21 @@
1
+package com.ruoyi.common.utils.thread;
2
+
3
+import lombok.Data;
4
+
5
+import java.util.function.Function;
6
+
7
+/**
8
+ * @author wangqiong
9
+ * @date 2023/11/25 9:25
10
+ **/
11
+@Data
12
+public class MultipleThreadStringParam<T> {
13
+    /**需要执行的方法*/
14
+    private Function<String,T> function;
15
+    /**参数ID*/
16
+    private String ids;
17
+    public MultipleThreadStringParam(Function<String,T> function, String ids){
18
+        this.function=function;
19
+        this.ids=ids;
20
+    }
21
+}

+ 547
- 0
ruoyi-common/src/main/java/com/ruoyi/common/utils/thread/MultipleThreadWorkUtil.java Näytä tiedosto

@@ -0,0 +1,547 @@
1
+package com.ruoyi.common.utils.thread;
2
+
3
+import cn.hutool.core.collection.CollectionUtil;
4
+import cn.hutool.core.util.ArrayUtil;
5
+import cn.hutool.core.util.StrUtil;
6
+import com.ruoyi.common.annotation.VoidFunction;
7
+import com.ruoyi.common.constant.Constants;
8
+import com.ruoyi.common.exception.ServiceException;
9
+import lombok.Data;
10
+import lombok.extern.slf4j.Slf4j;
11
+import org.springframework.transaction.annotation.Transactional;
12
+
13
+import java.util.*;
14
+import java.util.concurrent.*;
15
+import java.util.function.Function;
16
+
17
+/**
18
+ * @description 多线程操作数据库,一个线程异常全部回滚
19
+ * @Author wangqiong
20
+ * @Date 2023/11/25 14:02
21
+ * @Version V1.0
22
+ **/
23
+@Slf4j
24
+public class MultipleThreadWorkUtil {
25
+    private static  int SIMPLE_TIME_COUNT=1000;
26
+
27
+    public static <R,A>List<R> exec(Function<List<A>,R> execFun, List<A> list){
28
+        List<R> returnList=new ArrayList<>();
29
+        if(CollectionUtil.isEmpty(list)){
30
+            return returnList;
31
+        }
32
+        if(list.size()<SIMPLE_TIME_COUNT){
33
+            return Arrays.asList(execFun.apply(list));
34
+        }
35
+        int times=list.size()/SIMPLE_TIME_COUNT;
36
+        if(list.size()%SIMPLE_TIME_COUNT>0){
37
+            times++;
38
+        }
39
+        CountDownLatch mainLatch=new CountDownLatch(1);
40
+        //监控子线程
41
+        CountDownLatch threadLatch=new CountDownLatch(times);
42
+        //根据子线程执行结果判断是否需要回滚
43
+        BlockingDeque<Boolean>  resultList=new LinkedBlockingDeque<>();
44
+        //必须使用对象,如果使用变量会造成线程之间不能共享变量值
45
+        RollBack rollBack=new RollBack(false);
46
+        ExecutorService executorService=Executors.newFixedThreadPool(times);
47
+        List<Future<R>> futureList=new ArrayList<>();
48
+        for (int i = 0; i <times ; i++) {
49
+            if(i*SIMPLE_TIME_COUNT+SIMPLE_TIME_COUNT<list.size()){
50
+                // 创建子线程
51
+                Future<R> future=executorService.submit(new ExecThread(mainLatch,threadLatch,rollBack,resultList,list.subList(i*SIMPLE_TIME_COUNT,i*SIMPLE_TIME_COUNT+SIMPLE_TIME_COUNT),execFun));
52
+                futureList.add(future);
53
+            }else{
54
+                Future<R> future=executorService.submit(new ExecThread(mainLatch,threadLatch,rollBack,resultList,list.subList(i*SIMPLE_TIME_COUNT,list.size()),execFun));
55
+                futureList.add(future);
56
+            }
57
+        }
58
+        setResult(executorService,returnList,futureList,resultList,
59
+                mainLatch,threadLatch,rollBack,times);
60
+        return returnList;
61
+    }
62
+
63
+    public static <A>void exec(VoidFunction<Set<A>> execFun, Set<A> set){
64
+        if(set.size()<SIMPLE_TIME_COUNT){
65
+            execFun.apply(set);
66
+            return;
67
+        }
68
+        int times=set.size()/SIMPLE_TIME_COUNT;
69
+        if(set.size()%SIMPLE_TIME_COUNT>0){
70
+            times++;
71
+        }
72
+        CountDownLatch mainLatch=new CountDownLatch(1);
73
+        //监控子线程
74
+        CountDownLatch threadLatch=new CountDownLatch(times);
75
+        //根据子线程执行结果判断是否需要回滚
76
+        BlockingDeque<Boolean>  resultList=new LinkedBlockingDeque<>();
77
+        //必须使用对象,如果使用变量会造成线程之间不能共享变量值
78
+        RollBack rollBack=new RollBack(false);
79
+        ExecutorService executorService=Executors.newFixedThreadPool(times);
80
+        List<A> list=new ArrayList<>(set);
81
+        List<Future> futureList=new ArrayList<>();
82
+        for (int i = 0; i <times ; i++) {
83
+            Future future;
84
+            if(i*SIMPLE_TIME_COUNT+SIMPLE_TIME_COUNT<list.size()){
85
+                future=executorService.submit(new VoidExecThread(mainLatch,threadLatch,rollBack,resultList,list.subList(i*SIMPLE_TIME_COUNT,i*SIMPLE_TIME_COUNT+SIMPLE_TIME_COUNT),execFun));
86
+            }else{
87
+                future=executorService.submit(new VoidExecThread(mainLatch,threadLatch,rollBack,resultList,list.subList(i*SIMPLE_TIME_COUNT,list.size()),execFun));
88
+            }
89
+            futureList.add(future);
90
+        }
91
+        /**存放子线程返回结果*/
92
+        List<Boolean> backUpResult=new ArrayList<>();
93
+        try{
94
+            //
95
+            boolean await=threadLatch.await(times*3,TimeUnit.SECONDS);
96
+            if(!await){
97
+                rollBack.setRollBack(true);
98
+            }else{
99
+                //查看执行情况,如果有存在需要回滚的线程,则全部回滚
100
+                for (int i = 0; i <times ; i++) {
101
+                    Boolean result=resultList.take();
102
+                    backUpResult.add(result);
103
+                    if(result){
104
+                        /**有线程执行异常,需要回滚子线程*/
105
+                        rollBack.setRollBack(true);
106
+                    }
107
+                }
108
+            }
109
+        }catch (InterruptedException e){
110
+            e.printStackTrace();
111
+            throw new ServiceException(e.getMessage());
112
+        }finally {
113
+            //子线程再次继续执行
114
+            mainLatch.countDown();
115
+            executorService.shutdown();
116
+        }
117
+        /**检查子线程是否有异常,有异常整体回滚*/
118
+        for (int i = 0; i <times ; i++) {
119
+            if(CollectionUtil.isNotEmpty(backUpResult)){
120
+                Boolean result=backUpResult.get(i);
121
+                if(result){
122
+                    /**有线程执行异常,需要回滚子线程*/
123
+                    throw new ServiceException("多线程执行异常");
124
+                }
125
+            }else{
126
+                throw new ServiceException("多线程执行异常");
127
+            }
128
+        }
129
+        for (Future future : futureList) {
130
+            try {
131
+                future.get();
132
+            } catch (Exception e) {
133
+                throw new ServiceException(e.getMessage());
134
+            }
135
+        }
136
+    }
137
+
138
+   
139
+
140
+    public static <R>List<R> execFun(MultipleThreadStringParam<R>...params){
141
+        List<R> returnList=new ArrayList<>();
142
+        if(ArrayUtil.isEmpty(params)){
143
+            return returnList;
144
+        }
145
+        List<Integer> threadCountList=new ArrayList<>();
146
+        for (MultipleThreadStringParam param : params) {
147
+            List<String> idList= Arrays.asList(param.getIds().split(Constants.SPLIT_COMMA));
148
+            if(idList.size()<SIMPLE_TIME_COUNT){
149
+                threadCountList.add(1);
150
+            }else{
151
+                if(idList.size()%SIMPLE_TIME_COUNT>0){
152
+                    threadCountList.add(idList.size()/SIMPLE_TIME_COUNT+1);
153
+                }else{
154
+                    threadCountList.add(idList.size()/SIMPLE_TIME_COUNT);
155
+                }
156
+            }
157
+        }
158
+        int times=0;
159
+        for (Integer count : threadCountList) {
160
+            times+=count;
161
+        }
162
+        CountDownLatch mainLatch=new CountDownLatch(1);
163
+        //监控子线程
164
+        CountDownLatch threadLatch=new CountDownLatch(times);
165
+        //根据子线程执行结果判断是否需要回滚
166
+        BlockingDeque<Boolean>  resultList=new LinkedBlockingDeque<>();
167
+        //必须使用对象,如果使用变量会造成线程之间不能共享变量值
168
+        RollBack rollBack=new RollBack(false);
169
+        ExecutorService executorService=Executors.newFixedThreadPool(times);
170
+        List<Future<R>> futureList=new ArrayList<>();
171
+        for (int i = 0; i < params.length; i++) {
172
+            MultipleThreadStringParam param=params[i];
173
+            List<String> idList= Arrays.asList(param.getIds().split(Constants.SPLIT_COMMA));
174
+            for (int j = 0; j <threadCountList.get(i) ; j++) {
175
+                if(j*SIMPLE_TIME_COUNT+SIMPLE_TIME_COUNT<idList.size()){
176
+                    Future<R> future=executorService.submit(new ExecByIdsStringThread<>(mainLatch,threadLatch,rollBack,resultList,StrUtil.join(Constants.SPLIT_COMMA,idList.subList(j*SIMPLE_TIME_COUNT,j*SIMPLE_TIME_COUNT+SIMPLE_TIME_COUNT)),param.getFunction()));
177
+                    futureList.add(future);
178
+                }else{
179
+                    Future<R> future=executorService.submit(new ExecByIdsStringThread(mainLatch,threadLatch,rollBack,resultList,StrUtil.join(Constants.SPLIT_COMMA,idList.subList(j*SIMPLE_TIME_COUNT,idList.size())),param.getFunction()));
180
+                    futureList.add(future);
181
+                }
182
+            }
183
+        }
184
+        setResult(executorService,returnList,futureList,resultList,
185
+                mainLatch,threadLatch,rollBack,times);
186
+        return returnList;
187
+    }
188
+
189
+    public static <R>List<R> execByIds(Function<String,R> execFun,String ids){
190
+        List<R> returnList=new ArrayList<>();
191
+        if(StrUtil.isEmpty(ids)){
192
+            return returnList;
193
+        }
194
+        List<String> idList= Arrays.asList(ids.split(Constants.SPLIT_COMMA));
195
+        if(idList.size()<SIMPLE_TIME_COUNT){
196
+            return Arrays.asList(execFun.apply(ids));
197
+        }
198
+        int times=idList.size()/SIMPLE_TIME_COUNT;
199
+        if(idList.size()%SIMPLE_TIME_COUNT>0){
200
+            times++;
201
+        }
202
+        CountDownLatch mainLatch=new CountDownLatch(1);
203
+        //监控子线程
204
+        CountDownLatch threadLatch=new CountDownLatch(times);
205
+        //根据子线程执行结果判断是否需要回滚
206
+        BlockingDeque<Boolean>  resultList=new LinkedBlockingDeque<>();
207
+        //必须使用对象,如果使用变量会造成线程之间不能共享变量值
208
+        RollBack rollBack=new RollBack(false);
209
+        ExecutorService executorService=Executors.newFixedThreadPool(times);
210
+        List<Future<R>> futureList=new ArrayList<>();
211
+        for (int i = 0; i <times ; i++) {
212
+            if(i*SIMPLE_TIME_COUNT+SIMPLE_TIME_COUNT<idList.size()){
213
+                Future<R> future=executorService.submit(new ExecByIdsStringThread<>(mainLatch,threadLatch,rollBack,resultList,StrUtil.join(Constants.SPLIT_COMMA,idList.subList(i*SIMPLE_TIME_COUNT,i*SIMPLE_TIME_COUNT+SIMPLE_TIME_COUNT)),execFun));
214
+                futureList.add(future);
215
+            }else{
216
+                Future<R> future=executorService.submit(new ExecByIdsStringThread(mainLatch,threadLatch,rollBack,resultList,StrUtil.join(Constants.SPLIT_COMMA,idList.subList(i*SIMPLE_TIME_COUNT,idList.size())),execFun));
217
+                futureList.add(future);
218
+            }
219
+        }
220
+        setResult(executorService,returnList,futureList,resultList,
221
+                mainLatch,threadLatch,rollBack,times);
222
+        return returnList;
223
+    }
224
+
225
+    public static <R>List<R> execByIds(Function<List<String>,List<R>> execFun,List<String> idList){
226
+        List<R> returnList=new ArrayList<>();
227
+        if(CollectionUtil.isEmpty(idList)){
228
+            return returnList;
229
+        }
230
+        if(idList.size()<SIMPLE_TIME_COUNT){
231
+            return execFun.apply(idList);
232
+        }
233
+        int times=idList.size()/SIMPLE_TIME_COUNT;
234
+        if(idList.size()%SIMPLE_TIME_COUNT>0){
235
+            times++;
236
+        }
237
+        CountDownLatch mainLatch=new CountDownLatch(1);
238
+        //监控子线程
239
+        CountDownLatch threadLatch=new CountDownLatch(times);
240
+        //根据子线程执行结果判断是否需要回滚
241
+        BlockingDeque<Boolean>  resultList=new LinkedBlockingDeque<>();
242
+        //必须使用对象,如果使用变量会造成线程之间不能共享变量值
243
+        RollBack rollBack=new RollBack(false);
244
+        ExecutorService executorService=Executors.newFixedThreadPool(times);
245
+        List<Future<List<R>>> futureList=new ArrayList<>();
246
+        for (int i = 0; i <times ; i++) {
247
+            if(i*SIMPLE_TIME_COUNT+SIMPLE_TIME_COUNT<idList.size()){
248
+                Future<List<R>> future=executorService.submit(new ExecByIdsStringListThread<>(mainLatch,threadLatch,rollBack,resultList,idList.subList(i*SIMPLE_TIME_COUNT,i*SIMPLE_TIME_COUNT+SIMPLE_TIME_COUNT),execFun));
249
+                futureList.add(future);
250
+            }else{
251
+                Future<List<R>> future=executorService.submit(new ExecByIdsStringListThread(mainLatch,threadLatch,rollBack,resultList,idList.subList(i*SIMPLE_TIME_COUNT,idList.size()),execFun));
252
+                futureList.add(future);
253
+            }
254
+        }
255
+        /**存放子线程返回结果*/
256
+        List<Boolean> backUpResult=new ArrayList<>();
257
+        try{
258
+            //
259
+            boolean await=threadLatch.await(times*3,TimeUnit.SECONDS);
260
+            if(!await){
261
+                rollBack.setRollBack(true);
262
+            }else{
263
+                //查看执行情况,如果有存在需要回滚的线程,则全部回滚
264
+                for (int i = 0; i <times ; i++) {
265
+                    Boolean result=resultList.take();
266
+                    backUpResult.add(result);
267
+                    if(result){
268
+                        /**有线程执行异常,需要回滚子线程*/
269
+                        rollBack.setRollBack(true);
270
+                    }
271
+                }
272
+            }
273
+        }catch (InterruptedException e){
274
+            e.printStackTrace();
275
+            throw new ServiceException("多线程执行异常");
276
+        }finally {
277
+            //子线程再次继续执行
278
+            mainLatch.countDown();
279
+            executorService.shutdown();
280
+        }
281
+        /**检查子线程是否有异常,有异常整体回滚*/
282
+        for (int i = 0; i <times ; i++) {
283
+            if(CollectionUtil.isNotEmpty(backUpResult)){
284
+                Boolean result=backUpResult.get(i);
285
+                if(result){
286
+                    /**有线程执行异常,需要回滚子线程*/
287
+                    throw new ServiceException("多线程执行异常");
288
+                }
289
+            }else{
290
+                throw new ServiceException("多线程执行异常");
291
+            }
292
+        }
293
+        for (Future<List<R>> future : futureList) {
294
+            try {
295
+                returnList.addAll(future.get());
296
+            } catch (Exception e) {
297
+                throw new ServiceException("多线程执行异常");
298
+            }
299
+        }
300
+        return returnList;
301
+    }
302
+
303
+    private static <R>void setResult(ExecutorService executorService,List<R> returnList,List<Future<R>> futureList,BlockingDeque<Boolean>  resultList
304
+            ,CountDownLatch mainLatch,CountDownLatch threadLatch,RollBack rollBack,int times){
305
+        /**存放子线程返回结果*/
306
+        List<Boolean> backUpResult=new ArrayList<>();
307
+        try{
308
+            //
309
+            boolean await=threadLatch.await(times*3,TimeUnit.SECONDS);
310
+            if(!await){
311
+                rollBack.setRollBack(true);
312
+            }else{
313
+                //查看执行情况,如果有存在需要回滚的线程,则全部回滚
314
+                for (int i = 0; i <times ; i++) {
315
+                    Boolean result=resultList.take();
316
+                    backUpResult.add(result);
317
+                    if(result){
318
+                        /**有线程执行异常,需要回滚子线程*/
319
+                        rollBack.setRollBack(true);
320
+                    }
321
+                }
322
+            }
323
+        }catch (InterruptedException e){
324
+            e.printStackTrace();
325
+            throw new ServiceException("多线程执行异常");
326
+        }finally {
327
+            //子线程再次继续执行
328
+            mainLatch.countDown();
329
+            executorService.shutdown();
330
+        }
331
+        /**检查子线程是否有异常,有异常整体回滚*/
332
+        for (int i = 0; i <times ; i++) {
333
+            if(CollectionUtil.isNotEmpty(backUpResult)){
334
+                Boolean result=backUpResult.get(i);
335
+                if(result){
336
+                    /**有线程执行异常,需要回滚子线程*/
337
+                    throw new ServiceException("多线程执行异常");
338
+                }
339
+            }else{
340
+                throw new ServiceException("多线程执行异常");
341
+            }
342
+        }
343
+        for (Future<R> future : futureList) {
344
+            try {
345
+                returnList.add(future.get());
346
+            } catch (Exception e) {
347
+                throw new ServiceException("多线程执行异常");
348
+            }
349
+        }
350
+    }
351
+
352
+    static class QueryThread<R> implements Callable<List<R>>{
353
+        private String ids;
354
+        private Function<String,List<R>> execFun;
355
+        public QueryThread(String ids,Function<String,List<R>> execFun){
356
+            this.ids=ids;
357
+            this.execFun=execFun;
358
+        }
359
+        @Override
360
+        public List<R> call(){
361
+            return execFun.apply(ids);
362
+        }
363
+    }
364
+
365
+    static class ExecThread<T,R> implements Callable<R>{
366
+        /**主线程监控*/
367
+        private CountDownLatch mainLatch;
368
+        /**子线程监控*/
369
+        private CountDownLatch threadLatch;
370
+        /**是否回滚*/
371
+        private RollBack rollBack;
372
+        private BlockingDeque<Boolean>  resultList;
373
+        private List<T> list;
374
+        private Function<List<T>,R> execFun;
375
+        public ExecThread(CountDownLatch mainLatch,CountDownLatch threadLatch,RollBack rollBack,BlockingDeque<Boolean>  resultList,List<T> list,Function<List<T>,R> execFun){
376
+            this.mainLatch=mainLatch;
377
+            this.threadLatch=threadLatch;
378
+            this.rollBack=rollBack;
379
+            this.resultList=resultList;
380
+            this.list=list;
381
+            this.execFun=execFun;
382
+        }
383
+        @Override
384
+        @Transactional(rollbackFor = Exception.class)
385
+        public R call(){
386
+            // 是否回滚
387
+            Boolean result=false;
388
+            R r=null;
389
+            try{
390
+                // 对数据库进行操作
391
+                r=execFun.apply(list);
392
+            }catch (Exception e){
393
+                e.printStackTrace();
394
+                result=true;
395
+            }
396
+            resultList.add(result);
397
+            // 子线程-1,切换到主线程执行
398
+            threadLatch.countDown();
399
+            try{
400
+                // 等待主线程执行
401
+                mainLatch.await();
402
+            }catch (InterruptedException e){
403
+                throw new ServiceException("多线程执行异常");
404
+            }
405
+            if(rollBack.getRollBack()){
406
+                throw new ServiceException("多线程执行异常");
407
+            }
408
+            return r;
409
+        }
410
+    }
411
+
412
+    static class VoidExecThread<T> implements Runnable{
413
+        /**主线程监控*/
414
+        private CountDownLatch mainLatch;
415
+        /**子线程监控*/
416
+        private CountDownLatch threadLatch;
417
+        /**是否回滚*/
418
+        private RollBack rollBack;
419
+        private BlockingDeque<Boolean>  resultList;
420
+        private List<T> list;
421
+        private VoidFunction<Set<T>> execFun;
422
+        public VoidExecThread(CountDownLatch mainLatch,CountDownLatch threadLatch,RollBack rollBack,BlockingDeque<Boolean>  resultList,List<T> list,VoidFunction<Set<T>> execFun){
423
+            this.mainLatch=mainLatch;
424
+            this.threadLatch=threadLatch;
425
+            this.rollBack=rollBack;
426
+            this.resultList=resultList;
427
+            this.list=list;
428
+            this.execFun=execFun;
429
+        }
430
+        @Override
431
+        public void run() {
432
+            Boolean result=false;
433
+            try{
434
+                execFun.apply(new HashSet<>(list));
435
+            }catch (Exception e){
436
+                e.printStackTrace();
437
+                result=true;
438
+            }
439
+            resultList.add(result);
440
+            threadLatch.countDown();
441
+            try{
442
+                mainLatch.await();
443
+            }catch (InterruptedException e){
444
+                throw new ServiceException("多线程执行异常");
445
+            }
446
+            if(rollBack.getRollBack()){
447
+                throw new ServiceException("多线程执行异常");
448
+            }
449
+        }
450
+    }
451
+
452
+
453
+    static class ExecByIdsStringThread<R> implements Callable<R>{
454
+        /**主线程监控*/
455
+        private CountDownLatch mainLatch;
456
+        /**子线程监控*/
457
+        private CountDownLatch threadLatch;
458
+        /**是否回滚*/
459
+        private RollBack rollBack;
460
+        private BlockingDeque<Boolean>  resultList;
461
+        private String ids;
462
+        private Function<String,R> execFun;
463
+        public ExecByIdsStringThread(CountDownLatch mainLatch,CountDownLatch threadLatch,RollBack rollBack,BlockingDeque<Boolean>  resultList,String ids,Function<String,R> execFun){
464
+            this.mainLatch=mainLatch;
465
+            this.threadLatch=threadLatch;
466
+            this.rollBack=rollBack;
467
+            this.resultList=resultList;
468
+            this.ids=ids;
469
+            this.execFun=execFun;
470
+        }
471
+        @Override
472
+        public R call(){
473
+            Boolean result=false;
474
+            R r=null;
475
+            try{
476
+                r=execFun.apply(ids);
477
+            }catch (Exception e){
478
+                e.printStackTrace();
479
+                result=true;
480
+            }
481
+            resultList.add(result);
482
+            threadLatch.countDown();
483
+            try{
484
+                mainLatch.await();
485
+            }catch (InterruptedException e){
486
+                throw new ServiceException("多线程执行异常");
487
+            }
488
+            if(rollBack.getRollBack()){
489
+                throw new ServiceException("多线程执行异常");
490
+            }
491
+            return r;
492
+        }
493
+    }
494
+
495
+
496
+
497
+
498
+    static class ExecByIdsStringListThread<R> implements Callable<R>{
499
+        /**主线程监控*/
500
+        private CountDownLatch mainLatch;
501
+        /**子线程监控*/
502
+        private CountDownLatch threadLatch;
503
+        /**是否回滚*/
504
+        private RollBack rollBack;
505
+        private BlockingDeque<Boolean>  resultList;
506
+        private List<String> ids;
507
+        private Function<List<String>,R> execFun;
508
+        public ExecByIdsStringListThread(CountDownLatch mainLatch,CountDownLatch threadLatch,RollBack rollBack,BlockingDeque<Boolean>  resultList,List<String> ids,Function<List<String>,R> execFun){
509
+            this.mainLatch=mainLatch;
510
+            this.threadLatch=threadLatch;
511
+            this.rollBack=rollBack;
512
+            this.resultList=resultList;
513
+            this.ids=ids;
514
+            this.execFun=execFun;
515
+        }
516
+        @Override
517
+        public R call(){
518
+            Boolean result=false;
519
+            R r=null;
520
+            try{
521
+                r=execFun.apply(ids);
522
+            }catch (Exception e){
523
+                e.printStackTrace();
524
+                result=true;
525
+            }
526
+            resultList.add(result);
527
+            threadLatch.countDown();
528
+            try{
529
+                mainLatch.await();
530
+            }catch (InterruptedException e){
531
+                throw new ServiceException("多线程执行异常");
532
+            }
533
+            if(rollBack.getRollBack()){
534
+                throw new ServiceException("多线程执行异常");
535
+            }
536
+            return r;
537
+        }
538
+    }
539
+
540
+    @Data
541
+    static class RollBack{
542
+        private Boolean rollBack;
543
+        public RollBack(Boolean rollBack){
544
+            this.rollBack=rollBack;
545
+        }
546
+    }
547
+}

+ 140
- 0
ruoyi-common/src/main/java/com/ruoyi/common/websocket/MyWebSocketHandler.java Näytä tiedosto

@@ -0,0 +1,140 @@
1
+package com.ruoyi.common.websocket;
2
+
3
+
4
+import com.ruoyi.common.exception.ServiceException;
5
+import lombok.extern.slf4j.Slf4j;
6
+import org.springframework.stereotype.Component;
7
+
8
+import javax.websocket.*;
9
+import javax.websocket.server.PathParam;
10
+import javax.websocket.server.ServerEndpoint;
11
+import java.io.IOException;
12
+import java.util.HashMap;
13
+import java.util.Map;
14
+import java.util.concurrent.CopyOnWriteArraySet;
15
+
16
+/**
17
+ * @author wangqiong
18
+ * @description
19
+ * @date 2023-11-25 15:16
20
+ */
21
+@Component
22
+@Slf4j
23
+@ServerEndpoint("/websocket/{userName}")
24
+public class MyWebSocketHandler {
25
+    // 接口路径 ws://127.0.0.1:9000/websocket;
26
+    private Session session;
27
+
28
+    //concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。
29
+    private static CopyOnWriteArraySet<Session> sessions  = new CopyOnWriteArraySet<>();
30
+    // 用来存在线连接数
31
+    private static Map<String, Session> sessionPool = new HashMap<>();
32
+
33
+    /**
34
+     * 链接成功调用的方法
35
+     */
36
+    @OnOpen
37
+    public void onOpen(Session session, @PathParam(value = "userName") String userName) {
38
+        try {
39
+            sessions.add(session);
40
+            sessionPool.put(userName, session);
41
+
42
+        } catch (Exception e) {
43
+            e.printStackTrace();
44
+        }
45
+    }
46
+
47
+    /**
48
+     * 链接关闭调用的方法
49
+     */
50
+    @OnClose
51
+    public void onClose(Session session,@PathParam(value = "userName") String userName) {
52
+        try {
53
+            if(session!=null && session.isOpen()){
54
+                session.close();
55
+                sessions.remove(session);
56
+                sessionPool.remove(userName);
57
+            }
58
+
59
+            log.info("【websocket消息】连接断开,总数为:" + sessions.size());
60
+
61
+        } catch (Exception e) {
62
+            e.printStackTrace();
63
+        }
64
+    }
65
+
66
+    /**
67
+     * 收到客户端消息后调用的方法
68
+     *
69
+     * @param message
70
+     * @param
71
+     */
72
+    @OnMessage
73
+    public void onMessage(@PathParam(value = "userName") String userName, String message) {
74
+
75
+        System.out.println("【websocket消息】收到客户端消息:" + message);
76
+        // 将消息广播给其它用户
77
+        for (Session session : sessions) {
78
+            if(session!=null && session.isOpen()){
79
+                try {
80
+                    session.getBasicRemote().sendText(message);
81
+                }catch (Exception e){
82
+                    throw new ServiceException(e.getMessage());
83
+                }
84
+            }else {
85
+                try {
86
+                   session.close();
87
+                } catch (IOException e) {
88
+                    throw new ServiceException(e.getMessage());
89
+                }
90
+               sessions.remove(session);
91
+                sessionPool.remove(userName);
92
+            }
93
+        }
94
+
95
+    }
96
+
97
+    /**
98
+     * 发送错误时的处理
99
+     *
100
+     * @param session
101
+     * @param error
102
+     */
103
+    @OnError
104
+    public void onError(Session session, Throwable error) {
105
+
106
+        throw new ServiceException(error.getMessage());
107
+    }
108
+
109
+
110
+    /**
111
+     * 推消息给前端
112
+     *
113
+     * @param userId
114
+     * @param message
115
+     * @return
116
+     */
117
+    public static Runnable sendOneMessage(String userId, String message) {
118
+        Session session = sessionPool.get(userId);
119
+        if (session != null && session.isOpen()) {
120
+            try {
121
+                log.info("【推给前端消息】 :" + message);
122
+
123
+                //高并发下,防止session占用期间,被其他线程调用
124
+                synchronized (session) {
125
+                    session.getBasicRemote().sendText(message);
126
+                }
127
+
128
+            } catch (Exception e) {
129
+                e.printStackTrace();
130
+            }
131
+        }
132
+        return null;
133
+    }
134
+
135
+
136
+
137
+
138
+
139
+}
140
+

+ 26
- 0
ruoyi-common/src/main/java/com/ruoyi/common/websocket/WebSocketConfig.java Näytä tiedosto

@@ -0,0 +1,26 @@
1
+package com.ruoyi.common.websocket;//package com.ruoyi.common.websocket;
2
+
3
+import org.springframework.context.annotation.Bean;
4
+import org.springframework.context.annotation.Configuration;
5
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
6
+
7
+/**
8
+ * @author wangqiong
9
+ * @description
10
+ * @date 2023-11-25 15:19
11
+ */
12
+
13
+@Configuration
14
+public class WebSocketConfig {
15
+
16
+    /**
17
+     * 注入ServerEndpointExporter,
18
+     * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
19
+     */
20
+
21
+    @Bean
22
+    public ServerEndpointExporter serverEndpointExporter() {
23
+        return new ServerEndpointExporter();
24
+    }
25
+
26
+}

+ 1
- 1
ruoyi-framework/src/main/java/com/ruoyi/framework/config/SecurityConfig.java Näytä tiedosto

@@ -111,7 +111,7 @@ public class SecurityConfig extends WebSecurityConfigurerAdapter
111 111
                 // 过滤请求
112 112
                 .authorizeRequests()
113 113
                 // 对于登录login 注册register 验证码captchaImage 允许匿名访问
114
-                .antMatchers("/login", "/register", "/captchaImage","/uploadPath/**").permitAll()
114
+                .antMatchers("/login", "/register", "/captchaImage","/uploadPath/**","/websocket/**").permitAll()
115 115
                 // 静态资源,可匿名访问
116 116
                 .antMatchers(HttpMethod.GET, "/", "/*.html", "/**/*.html", "/**/*.css", "/**/*.js", "/profile/**").permitAll()
117 117
                 .antMatchers("/swagger-ui.html", "/swagger-resources/**", "/webjars/**", "/*/api-docs", "/druid/**").permitAll()