-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathmain_adb.py
More file actions
2323 lines (2172 loc) · 108 KB
/
main_adb.py
File metadata and controls
2323 lines (2172 loc) · 108 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import cv2
import numpy as np
import time
import random
import threading
import sys
import os
import gc
import traceback
from adb_controller import AdbController, woa_debug_set_runtime_started, save_image_safe, read_image_safe
from simple_ocr import StopSignal, SimpleOCR
def get_resource_path(relative_path):
if getattr(sys, 'frozen', False):
if hasattr(sys, '_MEIPASS'):
base = sys._MEIPASS
else:
base = os.path.dirname(sys.executable)
return os.path.join(base, relative_path)
base_path = os.path.dirname(os.path.abspath(__file__))
return os.path.join(base_path, relative_path)
class WoaBot:
def _check_running(self):
if not self.running:
raise StopSignal()
def __init__(self, log_callback=None, config_callback=None, instance_id=1):
self.instance_id = instance_id
self.last_staff_log_time = 0
self.config_callback = config_callback
self.adb = None
self.target_device = None
self.running = False
self._worker_thread = None
self.log_callback = log_callback
self.icon_path = get_resource_path('icon') + os.sep
self.last_staff_shortage_time = 0
self.CLOSE_X = 1153
self.CLOSE_Y = 181
self.ocr = None
self.stand_skip_index = 0
self.in_staff_shortage_mode = False
self.enable_bonus_staff = False
self.last_bonus_staff_time = 0
self.BONUS_COOLDOWN = 2 * 60 * 60
self.REGION_GLOBAL_STAFF = (573, 92, 640 - 573, 112 - 92)
self.REGION_TASK_COST = (270, 670, 330 - 270, 695 - 670)
self.REGION_GREEN_DOT = (405, 517, 201, 109)
self.next_bonus_retry_time = 0
self.doing_task_forbidden_until = 0
self.next_list_refresh_time = 0
self.enable_vehicle_buy = False
self.enable_speed_mode = False
self.enable_skip_staff = False
self.enable_delay_bribe = False
self.enable_random_task = False
self.control_method = "minitouch"
self.screenshot_method = "nemu_ipc"
self.mumu_path = ""
self.slide_min_duration = 250
self.slide_max_duration = 500
self.REGION_MAIN_ANCHOR = (30, 30, 55, 45)
self.REGION_REWARD_RECOVERY = (308, 428, 1007, 311)
self.REWARD_FLOW_BUTTONS = ['get_award_1.png', 'get_award_2.png', 'get_award_3.png', 'get_award_4.png',
'push_back.png', 'taxi_to_runway.png', 'start_general.png', 'error_ok.png']
self.last_seen_main_interface_time = time.time()
self.STUCK_TIMEOUT = 15.0
self.auto_delay_count = 0
self.TOWER_CHECK_POINTS = [(656, 809), (634, 831), (634, 809), (656, 830)]
# BGR: 红(需延时) 绿(无需延时) 灰(塔台关闭)
self.TOWER_RED_BGR = (110, 112, 251)
self.TOWER_GREEN_BGR = (153, 219, 94)
self.TOWER_OFF_COLOR = (128, 111, 94)
# 塔台菜单中四个控制器的倒计时 OCR 区域 (x, y, w, h),右上角顶点分别为 (320,387) (320,491) (320,595) (320,699)
self.TOWER_TIME_REGIONS = [
(320 - 110, 387, 110, 18),
(320 - 110, 491, 110, 18),
(320 - 110, 595, 110, 18),
(320 - 110, 699, 110, 18),
]
# 塔台倒计时定时器:到期时间戳(0 表示未设置)
self._tower_delay_deadline = 0.0
# 四个延时按钮坐标(对应控制器1-4)
self.TOWER_DELAY_BUTTONS = [(362, 376), (362, 479), (362, 583), (362, 688)]
# 全部延时按钮
self.TOWER_DELAY_ALL_BTN = (362, 785)
# 记录哪些控制器是活跃的(启动时确定)
self._tower_active_slots = [False, False, False, False]
# 塔台是否已确认关闭(全部未开启)
self._tower_disabled = False
# 塔台图标 ROI 区域 (x, y, w, h)
self.TOWER_ICON_ROI = (549, 794, 53, 55)
# 塔台是否曾经开启过(用于"塔台关闭筛选全部"功能)
self._tower_was_active = False
# 塔台关闭后强制模式1(仅在塔台从开启变为关闭时触发)
self._tower_off_force_mode1 = False
self.COLOR_LIGHT = (203, 191, 179)
self.COLOR_DARK = (101, 85, 70)
self.FILTER_MENU_BTN = (1537, 37)
self.FILTER_CHECK_POINTS_MODE1 = [
((1542, 190), True), ((1535, 118), True), ((1540, 261), True),
((1533, 331), True), ((1537, 403), True), ((1542, 474), False)
]
self.FILTER_CHECK_POINTS_MODE2 = [
((1542, 190), False), ((1535, 118), True), ((1540, 261), True),
((1533, 331), True), ((1537, 403), True), ((1542, 474), False)
]
self.enable_no_takeoff_mode = False
self.enable_cancel_stand_filter = False
self.FILTER_POINT_A = (1535, 118)
self.FILTER_POINT_B = (1542, 190)
self._filter_side = 0
self._filter_side_switch_time = 0.0
self._filter_no_pending_switch = False
self._filter_no_pending_next_switch_time = 0.0
self._request_apply_mode3 = False
self._request_switch_mode1 = False
self._no_takeoff_logout_min = 0.0
self._no_takeoff_logout_max = 0.0
self._filter_switch_min = 3.0
self._filter_switch_max = 6.0
self._no_takeoff_logout_next_time = 0.0
self._stat_approach = 0
self._stat_depart = 0
self._stat_stand_count = 0
self._stat_stand_staff = 0
self._stat_session_approach = 0
self._stat_session_depart = 0
self._stat_session_stand_count = 0
self._stat_session_stand_staff = 0
self._stat_date = None
self._stat_last_required_cost = None
self.REGION_STATUS_TITLE = (20, 320, 190, 250)
self.LIST_ROI_X = 1312
self.LIST_ROI_W = 60
self.LIST_ROI_H = 900
self.REGION_BOTTOM_ROI = (20, 750, 340, 130)
self.REGION_VACANT_ROI = (390, 690, 730, 150)
# 防卡死相关
self.consecutive_timeout_count = 0
self.last_recovery_time = 0 # 冷却时间
self.last_window_close_time = time.time()
self.last_checked_avail_staff = -1
self.last_read_success = False
self.thinking_mode = 0
self.thinking_range = (0, 0)
self.ICON_ROIS = {
'cross_runway.png': self.REGION_BOTTOM_ROI,
'get_award_1.png': self.REGION_BOTTOM_ROI,
'get_award_2.png': self.REGION_REWARD_RECOVERY,
'get_award_3.png': self.REGION_REWARD_RECOVERY,
'get_award_4.png': self.REGION_REWARD_RECOVERY,
'landing_permitted.png': self.REGION_BOTTOM_ROI,
'landing_prohibited.png': self.REGION_BOTTOM_ROI,
'push_back.png': self.REGION_BOTTOM_ROI,
'stand_confirm.png': self.REGION_BOTTOM_ROI,
'start_ground_support.png': self.REGION_BOTTOM_ROI,
'start_ice.png': self.REGION_BOTTOM_ROI,
'takeoff.png': self.REGION_BOTTOM_ROI,
'takeoff_by_gliding.png': self.REGION_BOTTOM_ROI,
'taxi_to_runway.png': self.REGION_BOTTOM_ROI,
'start_general.png': self.REGION_BOTTOM_ROI,
'wait.png': self.REGION_BOTTOM_ROI,
'go_repair.png': self.REGION_BOTTOM_ROI,
'start_repair.png': self.REGION_BOTTOM_ROI,
'ground_support_done.png': self.REGION_BOTTOM_ROI,
'stand_vacant.png': self.REGION_VACANT_ROI,
'green_dot.png': self.REGION_GREEN_DOT
}
self.task_templates = {}
task_files = [
'pending_ice.png', 'pending_repair.png', 'pending_doing.png',
'pending_approach.png', 'pending_taxiing.png', 'pending_takeoff.png',
'pending_stand.png'
]
for tf in task_files:
p = self.icon_path + tf
if os.path.exists(p):
self.task_templates[tf] = read_image_safe(p)
def set_random_task_mode(self, enabled, log_change=True):
if self.enable_random_task == enabled:
return
self.enable_random_task = enabled
if log_change:
self.log(f">>> [配置] 随机任务选择: {'已开启' if enabled else '已关闭'}")
def set_no_takeoff_mode(self, enabled):
if self.enable_no_takeoff_mode == enabled:
return
self.enable_no_takeoff_mode = enabled
self.log(f">>> [配置] 不起飞模式: {'已开启' if enabled else '已关闭'}")
if enabled:
self._request_apply_mode3 = True
else:
# 关闭不起飞模式时,主循环中请求切回模式1
self._request_switch_mode1 = True
def set_no_takeoff_logout_interval(self, min_m, max_m):
try:
mn = float(min_m) if min_m is not None else 0.0
mx = float(max_m) if max_m is not None else 0.0
mn = max(0.0, mn)
mx = max(0.0, mx)
if mx < mn: mx = mn
except (TypeError, ValueError):
mn, mx = 0.0, 0.0
if self._no_takeoff_logout_min == mn and self._no_takeoff_logout_max == mx:
return
self._no_takeoff_logout_min = mn
self._no_takeoff_logout_max = mx
if mn == 0 and mx == 0:
self.log(">>> [配置] 不起飞模式小退间隔随机范围: 关闭")
else:
self.log(f">>> [配置] 不起飞模式小退间隔随机范围: {mn}-{mx} 分钟")
def set_filter_switch_interval(self, min_s, max_s):
try:
mn = float(min_s) if min_s is not None else 3.0
mx = float(max_s) if max_s is not None else 6.0
mn = max(0.5, mn)
mx = max(mn, mx)
except (TypeError, ValueError):
mn, mx = 3.0, 6.0
if self._filter_switch_min == mn and self._filter_switch_max == mx:
return
self._filter_switch_min = mn
self._filter_switch_max = mx
self.log(f">>> [配置] 无任务来回切换间隔随机范围: {mn}-{mx} 秒")
def set_cancel_stand_filter_when_tower_off(self, enabled):
if self.enable_cancel_stand_filter == enabled:
return
self.enable_cancel_stand_filter = enabled
self.log(f">>> [配置] 塔台关闭时取消停机位筛选: {'已开启' if enabled else '已关闭'}")
def _color_diff(self, a, b):
return sum(abs(int(a[i]) - int(b[i])) for i in range(3))
def _is_pixel_light(self, screen, x, y):
try:
b, g, r = screen[y, x]
return self._color_diff((b, g, r), self.COLOR_LIGHT) < 80
except Exception:
return False
def _is_pixel_dark(self, screen, x, y):
try:
b, g, r = screen[y, x]
return self._color_diff((b, g, r), self.COLOR_DARK) < 80
except Exception:
return False
def _is_tower_off(self, screen):
"""四个检测点全部为灰色才表示塔台关闭"""
tb, tg, tr = self.TOWER_OFF_COLOR
for (x, y) in self.TOWER_CHECK_POINTS:
try:
b, g, r = screen[y, x]
if self._color_diff((b, g, r), (tb, tg, tr)) > 70:
return False
except Exception:
return False
return True
def _is_tower_icon_visible(self):
"""检测塔台图标是否可见(ROI 内匹配 tower.png)"""
return self.safe_locate('tower.png', confidence=0.8, region=self.TOWER_ICON_ROI) is not None
def _is_point_red(self, b, g, r):
"""检测点是否为红色(需延时):R 主导,与绿/灰区分"""
rb, rg, rr = self.TOWER_RED_BGR
diff_red = self._color_diff((b, g, r), (rb, rg, rr))
diff_green = self._color_diff((b, g, r), self.TOWER_GREEN_BGR)
diff_gray = self._color_diff((b, g, r), self.TOWER_OFF_COLOR)
return diff_red < 90 and diff_red <= diff_green and diff_red <= diff_gray
def _matches_filter_mode(self, screen, points_config):
for (x, y), want_light in points_config:
if want_light:
if not self._is_pixel_light(screen, x, y):
return False
else:
if not self._is_pixel_dark(screen, x, y):
return False
return True
def _matches_filter_mode3(self, screen):
"""不起飞模式:菜单深色、(1542,474)深色,(1535,118)与(1542,190)有且仅有一个为深色,(1533,331)(1537,403)为浅色"""
mx, my = self.FILTER_MENU_BTN
if not self._is_pixel_dark(screen, mx, my):
return False
if not self._is_pixel_dark(screen, 1542, 474):
return False
if not self._is_pixel_light(screen, 1533, 331):
return False
if not self._is_pixel_light(screen, 1537, 403):
return False
a_dark = self._is_pixel_dark(screen, self.FILTER_POINT_A[0], self.FILTER_POINT_A[1])
b_dark = self._is_pixel_dark(screen, self.FILTER_POINT_B[0], self.FILTER_POINT_B[1])
return (a_dark and not b_dark) or (not a_dark and b_dark)
def _do_filter_switch(self):
"""在(1535,118)与(1542,190)之间切换:点击当前非当前侧,使该侧变深、另一侧变浅"""
if self._filter_side == 0:
x, y = self.FILTER_POINT_B[0], self.FILTER_POINT_B[1]
else:
x, y = self.FILTER_POINT_A[0], self.FILTER_POINT_A[1]
self._click_filter_point(x, y)
self.sleep(0.3)
self._filter_side = 1 - self._filter_side
self._filter_side_switch_time = time.time()
self._filter_no_pending_switch = False
#self.log(f"📋 [筛选] 不起飞模式:切换至{'进场' if self._filter_side == 0 else '停机位'}侧")
def _do_no_takeoff_small_logout(self):
"""不起飞模式小退:点击主界面 -> 等待0.5s -> 点击换机场 -> 等待4s -> 点击 first_start_2(30s内) -> 等待10s -> 等待主界面(90s内)"""
self._check_running()
loc = self.safe_locate('main_interface.png', region=self.REGION_MAIN_ANCHOR, confidence=0.8)
if not loc:
self.log("📋 [小退] 未找到主界面按钮,跳过本次小退")
return
self.adb.click(loc[0], loc[1], random_offset=5)
self.sleep(0.5)
if not self.find_and_click('change_airport.png', confidence=0.75, wait=0):
self.log("📋 [小退] 未找到更改机场按钮,跳过")
return
self.sleep(4.0)
t0 = time.time()
found_fs2 = False
while time.time() - t0 < 30.0:
self._check_running()
if self.find_and_click('first_start_2.png', wait=0.5):
found_fs2 = True
break
self.sleep(0.5)
if not found_fs2:
self.log("📋 [小退] 30s 内未找到开始按钮,继续等待主界面")
self.sleep(10.0)
wait_main = time.time()
while time.time() - wait_main < 90.0:
self._check_running()
if self.safe_locate('main_interface.png', region=self.REGION_MAIN_ANCHOR, confidence=0.8):
self.log("📋 [小退] 已返回主界面,恢复处理")
self.adb.click(1160, 41)
self.sleep(0.5)
return
self.sleep(1.0)
self.log("📋 [小退] 90s 内未检测到主界面,交由后续流程处理")
def _force_switch_filter_mode1(self):
"""在主循环中强制将筛选状态切回模式1(仅待处理)"""
# 仅在主界面下尝试
if not self.safe_locate('main_interface.png', region=self.REGION_MAIN_ANCHOR, confidence=0.8):
return
screen = self.adb.get_screenshot()
if screen is None:
return
mx, my = self.FILTER_MENU_BTN
# 确保筛选菜单已展开
if self._is_pixel_light(screen, mx, my):
self._click_filter_point(mx, my)
self.sleep(0.5)
for _ in range(5):
screen = self.adb.get_screenshot()
if screen is None:
break
if self._is_pixel_dark(screen, mx, my):
break
self._click_filter_point(mx, my)
self.sleep(0.3)
screen = self.adb.get_screenshot()
if screen is None:
return
# 若已是模式1则不动,否则按模式1配置逐项修正
if self._matches_filter_mode(screen, self.FILTER_CHECK_POINTS_MODE1):
return
self.log("📋 [筛选] 关闭不起飞模式,强制切换至模式1(仅待处理)...")
for (x, y), want_light in self.FILTER_CHECK_POINTS_MODE1:
screen = self.adb.get_screenshot()
if screen is None:
break
is_light = self._is_pixel_light(screen, x, y)
if (want_light and not is_light) or (not want_light and is_light):
self._click_filter_point(x, y)
self.sleep(0.2)
def _click_filter_point(self, x, y):
self.adb.click(x, y, random_offset=5)
def _periodic_15s_check(self, force_initial_filter_check=False):
if not hasattr(self, 'last_periodic_check_time'):
self.last_periodic_check_time = 0
now = time.time()
if not force_initial_filter_check and now - self.last_periodic_check_time < 15.0:
return
self.last_periodic_check_time = now
# 1. 检测主界面
if self.safe_locate('main_interface.png', region=self.REGION_MAIN_ANCHOR, confidence=0.8):
self.last_seen_main_interface_time = time.time()
# 2. 中间领奖区防卡死:不论是否在主界面,均遍历寻找领奖按钮并点击直至恢复正常
rx, ry, rw, rh = self.REGION_REWARD_RECOVERY
for _ in range(6):
screen = self.adb.get_screenshot()
if screen is None:
break
roi = screen[ry:ry + rh, rx:rx + rw]
clicked = False
for btn in self.REWARD_FLOW_BUTTONS:
res = self.adb.locate_image(self.icon_path + btn, confidence=0.65, screen_image=roi)
if res:
self.log(f"🚨 [15s周期检测] 在领奖区域内发现 {btn},点击恢复...")
self.adb.click(res[0] + rx, res[1] + ry, random_offset=3)
self.sleep(1.0)
clicked = True
break
if not clicked:
break
# 3. 筛选状态检查 (仅在确认在主界面时执行)
if not self.safe_locate('main_interface.png', region=self.REGION_MAIN_ANCHOR, confidence=0.8):
return
screen = self.adb.get_screenshot()
if screen is None:
return
mx, my = self.FILTER_MENU_BTN
if self._is_pixel_light(screen, mx, my):
self.log("📋 [筛选] 展开筛选菜单...")
self._click_filter_point(mx, my)
self.sleep(0.5)
for _ in range(5):
screen = self.adb.get_screenshot()
if screen is None:
break
if self._is_pixel_dark(screen, mx, my):
break
self._click_filter_point(mx, my)
self.sleep(0.3)
screen = self.adb.get_screenshot()
if screen is None:
return
is_mode1 = self._matches_filter_mode(screen, self.FILTER_CHECK_POINTS_MODE1)
is_mode2 = self._matches_filter_mode(screen, self.FILTER_CHECK_POINTS_MODE2)
is_mode3 = self.enable_no_takeoff_mode and self._matches_filter_mode3(screen)
def apply_mode(points_config):
for (x, y), want_light in points_config:
screen = self.adb.get_screenshot()
if screen is None:
break
is_light = self._is_pixel_light(screen, x, y)
if (want_light and not is_light) or (not want_light and is_light):
self._click_filter_point(x, y)
self.sleep(0.2)
def apply_mode3():
"""确保菜单深、(1542,474)深,(1533,331)(1537,403)浅,再根据 _filter_side 确保对应一侧深"""
for _ in range(8):
screen = self.adb.get_screenshot()
if screen is None:
return
if self._is_pixel_light(screen, mx, my):
self._click_filter_point(mx, my)
self.sleep(0.3)
continue
if not self._is_pixel_dark(screen, 1542, 474):
self._click_filter_point(1542, 474)
self.sleep(0.2)
continue
if not self._is_pixel_light(screen, 1533, 331):
self._click_filter_point(1533, 331)
self.sleep(0.2)
continue
if not self._is_pixel_light(screen, 1537, 403):
self._click_filter_point(1537, 403)
self.sleep(0.2)
continue
ax, ay = self.FILTER_POINT_A[0], self.FILTER_POINT_A[1]
bx, by = self.FILTER_POINT_B[0], self.FILTER_POINT_B[1]
a_dark = self._is_pixel_dark(screen, ax, ay)
b_dark = self._is_pixel_dark(screen, bx, by)
want_a_dark = self._filter_side == 0
if want_a_dark and not a_dark:
self._click_filter_point(ax, ay)
self.sleep(0.2)
elif not want_a_dark and not b_dark:
self._click_filter_point(bx, by)
self.sleep(0.2)
else:
break
# 不起飞模式优先:只要开启则强制进入模式3,不理会塔台关闭
if self.enable_no_takeoff_mode:
if not is_mode3:
self.log("📋 [筛选] 应用不起飞模式(动态筛选)...")
apply_mode3()
self._filter_side_switch_time = time.time()
return
need_mode1_only = self._tower_off_force_mode1
if need_mode1_only and not is_mode1:
self.log("📋 [筛选] 切换至仅待处理... (塔台已关闭)")
apply_mode(self.FILTER_CHECK_POINTS_MODE1)
return
if is_mode1 or is_mode2:
return
self.log("📋 [筛选] 状态异常,默认切换至仅待处理...")
apply_mode(self.FILTER_CHECK_POINTS_MODE1)
def _nemu_ipc_debug_save_mismatch(self, nemu_img, adb_img):
"""nemu_ipc 与 ADB 截图不匹配时保存对比图,便于排查"""
try:
import datetime
debug_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "nemu_ipc_debug")
os.makedirs(debug_dir, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
p_n = os.path.join(debug_dir, f"mismatch_nemu_{ts}.png")
p_a = os.path.join(debug_dir, f"mismatch_adb_{ts}.png")
save_image_safe(p_n, nemu_img)
save_image_safe(p_a, adb_img)
self.log(f"📋 [调试] 已保存对比图: {p_n} / {p_a}")
except Exception as e:
self.log(f"📋 [调试] 保存对比图失败: {e}")
def _droidcast_raw_debug_save_mismatch(self, droidcast_img, adb_img):
"""DroidCast_raw 与 ADB 截图不匹配时保存对比图,便于排查"""
try:
import datetime
debug_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "droidcast_raw_debug")
os.makedirs(debug_dir, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
p_d = os.path.join(debug_dir, f"mismatch_droidcast_{ts}.png")
p_a = os.path.join(debug_dir, f"mismatch_adb_{ts}.png")
save_image_safe(p_d, droidcast_img)
save_image_safe(p_a, adb_img)
self.log(f"📋 [调试] 已保存对比图: {p_d} / {p_a}")
except Exception as e:
self.log(f"📋 [调试] 保存对比图失败: {e}")
def _save_list_roi_debug(self, full_screen, list_roi_img, lx, ly, lw, lh):
"""任务列表检测为 0 时保存调试图(WOA_DEBUG=1 或 LIST_DETECT_DEBUG=1)"""
try:
base = os.path.dirname(sys.executable) if getattr(sys, "frozen", False) else os.path.dirname(os.path.abspath(__file__))
debug_dir = os.path.join(base, "list_detect_debug")
os.makedirs(debug_dir, exist_ok=True)
ts = time.strftime("%Y%m%d_%H%M%S")
p_full = os.path.join(debug_dir, f"list_debug_full_{ts}.png")
p_roi = os.path.join(debug_dir, f"list_debug_roi_{ts}.png")
# 兼容中文路径的保存方式
save_image_safe(p_full, full_screen)
save_image_safe(p_roi, list_roi_img)
self.log(f"📋 [调试] 任务列表调试图已保存至: {debug_dir}")
except Exception as e:
self.log(f"📋 [调试] 保存 list_detect 截图失败: {e}")
def _run_pending_detection(self, list_roi_img):
"""按行识别:每行只保留该行内置信度最高的类型,避免跨行竞争导致相似图标误判。"""
base_defs = [
('pending_ice.png', self.handle_ice_task, 0.8, 'ice'),
('pending_repair.png', self.handle_repair_task, 0.8, 'repair'),
('pending_doing.png', self.handle_vehicle_check_task, 0.85, 'doing'),
('pending_approach.png', self.handle_approach_task, 0.8, 'approach'),
('pending_taxiing.png', self.handle_taxiing_task, 0.8, 'taxiing'),
('pending_takeoff.png', self.handle_takeoff_task, 0.8, 'takeoff'),
('pending_stand.png', self.handle_stand_task, 0.8, 'stand')
]
try:
conf_override = float(os.environ.get("LIST_DETECT_CONF", "0"))
except ValueError:
conf_override = 0
task_defs = [(n, h, conf_override if conf_override > 0 else c, t) for n, h, c, t in base_defs]
ROW_HEIGHT = 24
all_matches = []
for img_name, handler, conf, t_type in task_defs:
found = self._fast_locate_all(list_roi_img, img_name, confidence=conf)
for item in found:
rel_cx, rel_cy = item['center']
abs_cx = rel_cx + self.LIST_ROI_X
abs_cy = rel_cy
type_for_logic = 'stand' if t_type == 'stand' else ('doing' if t_type == 'doing' else 'other')
all_matches.append({
'y': rel_cy, 'center': (abs_cx, abs_cy), 'handler': handler, 'name': img_name,
'score': item['score'], 'type': type_for_logic, 'raw_type': t_type
})
def _same_row(y1, y2):
return abs(y1 - y2) <= ROW_HEIGHT
final_tasks = []
used = [False] * len(all_matches)
for i, det in enumerate(all_matches):
if used[i]:
continue
cy = det['y']
same_row = [all_matches[j] for j in range(len(all_matches)) if not used[j] and _same_row(all_matches[j]['y'], cy)]
if len(same_row) <= 1:
final_tasks.append(det)
used[i] = True
for j, d in enumerate(all_matches):
if d is det or (not used[j] and _same_row(d['y'], cy)):
used[j] = True
continue
same_row.sort(key=lambda x: x['score'], reverse=True)
best = same_row[0]
final_tasks.append(best)
for j, d in enumerate(all_matches):
if not used[j] and _same_row(d['y'], cy):
used[j] = True
final_tasks.sort(key=lambda t: t['y'])
raw_detections = all_matches
return raw_detections, final_tasks
def _fast_locate_all(self, screen_roi, template_name, confidence=0.8):
if template_name not in self.task_templates:
return []
template = self.task_templates[template_name]
if template is None: return []
try:
res = cv2.matchTemplate(screen_roi, template, cv2.TM_CCOEFF_NORMED)
except Exception:
return []
h, w = template.shape[:2]
loc = np.where(res >= confidence)
found_items = []
for pt in zip(*loc[::-1]):
score = res[pt[1], pt[0]]
is_duplicate = False
for item in found_items:
if abs(pt[0] - item['box'][0]) < 10 and abs(pt[1] - item['box'][1]) < 10:
if score > item['score']:
item['score'] = score
item['box'] = (pt[0], pt[1], w, h)
item['center'] = (pt[0] + w // 2, pt[1] + h // 2)
is_duplicate = True
break
if not is_duplicate:
found_items.append({
'box': (pt[0], pt[1], w, h),
'center': (pt[0] + w // 2, pt[1] + h // 2),
'score': score
})
return found_items
def set_thinking_time_mode(self, mode_index, log_change=True):
mode_index = int(mode_index)
if hasattr(self, 'thinking_mode') and self.thinking_mode == mode_index:
return
self.thinking_mode = mode_index
if self.thinking_mode == 1:
self.thinking_range = (0.1, 0.4)
desc = "短 (0.1s-0.4s)"
elif self.thinking_mode == 2:
self.thinking_range = (0.3, 1.0)
desc = "中 (0.3s-1.0s)"
elif self.thinking_mode == 3:
self.thinking_range = (0.8, 2.0)
desc = "长 (0.8s-2.0s)"
else:
self.thinking_range = (0, 0)
desc = "关闭"
prev = getattr(self, '_last_thinking_desc', None)
if prev == desc:
if self.adb:
self.adb.set_thinking_strategy(*self.thinking_range)
return
self._last_thinking_desc = desc
if self.adb:
self.adb.set_thinking_strategy(*self.thinking_range)
if log_change:
self.log(f">>> [配置] 思考时间: {desc}")
def set_bonus_staff_feature(self, enabled):
if self.enable_bonus_staff == enabled: return
self.enable_bonus_staff = enabled
self.log(f">>> [配置] 自动领取地勤: {'已开启' if enabled else '已关闭'}")
def set_vehicle_buy_feature(self, enabled):
if self.enable_vehicle_buy == enabled: return
self.enable_vehicle_buy = enabled
self.log(f">>> [配置] 自动购买车辆: {'已开启' if enabled else '已关闭'}")
def set_speed_mode(self, enabled):
if self.enable_speed_mode == enabled: return
self.enable_speed_mode = enabled
self.log(f">>> [配置] 跳过二次校验: {'已开启' if enabled else '已关闭'}")
def set_skip_staff_verify(self, enabled):
if self.enable_skip_staff == enabled: return
self.enable_skip_staff = enabled
self.log(f">>> [配置] 跳过地勤验证: {'已开启' if enabled else '已关闭'}")
def set_auto_delay(self, count):
count = int(count)
self.auto_delay_count = count
def set_delay_bribe(self, enabled):
if self.enable_delay_bribe == enabled: return
self.enable_delay_bribe = enabled
self.log(f">>> [配置] 延误飞机贿赂: {'已开启' if enabled else '已关闭'}")
def set_slide_duration_range(self, min_d, max_d, log_change=True):
min_d = int(min_d)
max_d = int(max_d)
if hasattr(self, 'slide_min_duration') and hasattr(self, 'slide_max_duration'):
if self.slide_min_duration == min_d and self.slide_max_duration == max_d:
return
self.slide_min_duration = min_d
self.slide_max_duration = max_d
if log_change:
self.log(f">>> [配置] 滑块随机耗时: {self.slide_min_duration}ms - {self.slide_max_duration}ms")
def set_device(self, device_serial):
self.target_device = device_serial
def set_control_method(self, method):
m = (method or "adb").lower()
valid = ("adb", "minitouch", "uiautomator2")
if m not in valid:
m = "adb"
if self.control_method != m:
self.control_method = m
def set_screenshot_method(self, method):
m = (method or "adb").lower()
if m not in ("adb", "nemu_ipc", "uiautomator2", "droidcast_raw"):
m = "adb"
if self.screenshot_method != m:
self.screenshot_method = m
def set_mumu_path(self, path):
self.mumu_path = (path or "").strip()
if self.adb:
self.adb.set_mumu_path(self.mumu_path)
def log(self, message):
if not message or not str(message).strip():
return
try:
print(message)
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
pass
if self.log_callback:
try:
self.log_callback(message)
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
pass
# 【核心修正】智能防卡死逻辑
# 1. 过滤掉恢复日志本身,防止递归触发
if "防卡死" in message: return
# 2. 统计警告次数
if "⚠️" in message or "超时" in message:
self.consecutive_timeout_count += 1
elif "✅" in message or "成功" in message:
self.consecutive_timeout_count = 0
# 3. 触发条件:连续3次警告 + 冷却时间已过
if self.consecutive_timeout_count > 3:
if time.time() - self.last_recovery_time > 10:
self.log("🚨 [防卡死] 检测到连续多次卡顿,尝试紧急寻找领奖图标...")
self.consecutive_timeout_count = 0
self.last_recovery_time = time.time()
self._attempt_emergency_reward_recovery()
else:
self.consecutive_timeout_count = 0 # 冷却中,暂时重置
def _attempt_emergency_reward_recovery(self):
# 全屏搜索领奖图标
targets = ['get_award_1.png', 'get_award_2.png', 'get_award_3.png', 'get_award_4.png']
# 尝试循环检测3次,确保如果点到第1步能接着点第2步
for _ in range(3):
clicked = False
for t in targets:
# 使用 region=None 进行全屏搜索,降低一点阈值以防图标变灰或变暗
res = self.safe_locate(t, confidence=0.65, region=None)
if res:
self.log(f" -> 🚨 紧急恢复:点击 {t}")
self.adb.click(res[0], res[1])
self.sleep(1.5) # 点击后多等一会儿
clicked = True
break
if not clicked:
break
def wait_and_click(self, image_name, timeout=3.0, click_wait=0.2, confidence=0.8, random_offset=5):
self._check_running()
start_time = time.time()
use_roi = False
roi_x, roi_y, roi_w, roi_h = 0, 0, 0, 0
if image_name in self.ICON_ROIS:
use_roi = True
roi_x, roi_y, roi_w, roi_h = self.ICON_ROIS[image_name]
while time.time() - start_time < timeout:
self._check_running()
screen = self.adb.get_screenshot()
if screen is None:
time.sleep(0.1)
continue
if use_roi:
search_img = screen[roi_y:roi_y + roi_h, roi_x:roi_x + roi_w]
offset_x, offset_y = roi_x, roi_y
else:
search_img = screen
offset_x, offset_y = 0, 0
result = self.adb.locate_image(self.icon_path + image_name, confidence=confidence, screen_image=search_img)
if result:
self._check_running()
real_x = result[0] + offset_x
real_y = result[1] + offset_y
self.adb.click(real_x, real_y, random_offset=random_offset)
if click_wait > 0: self.sleep(click_wait)
return True
time.sleep(0.1)
return False
def start(self):
if self.running: return
self.stand_skip_index = 0
self.in_staff_shortage_mode = False
self.last_checked_avail_staff = -1
self.last_window_close_time = time.time()
# 初始化计数器
self.consecutive_timeout_count = 0
self.consecutive_errors = 0
self.last_recovery_time = 0
# 重置塔台状态
self._tower_disabled = False
self._tower_was_active = False
self._tower_off_force_mode1 = False
self._tower_delay_deadline = 0.0
self._tower_active_slots = [False, False, False, False]
if not self.target_device:
self.log("❌ 未选择设备!")
return
self.running = True
self.log(f">>> 连接设备: {self.target_device} ...")
try:
self.adb = AdbController(
target_device=self.target_device,
control_method=self.control_method,
screenshot_method=self.screenshot_method,
instance_id=self.instance_id,
)
self.adb.set_mumu_path(self.mumu_path)
if self.config_callback:
self.adb.set_nemu_folder_callback(
lambda folder: self.config_callback("mumu_path", folder)
)
self.adb.set_thinking_strategy(*self.thinking_range)
self.ocr = SimpleOCR(self.adb, self.icon_path)
self.log("✅ OCR 模块已加载")
test_img = self.adb.get_screenshot()
if test_img is None:
self.log("❌ 连接成功但无法获取画面!")
self.running = False
try:
if self.adb:
self.adb.close()
except Exception:
pass
return
h, w = test_img.shape[:2]
if w != 1600 or h != 900:
self.log(f"🛑 分辨率错误:{w}x{h} (必须 1600x900)")
self.running = False
try:
if self.adb:
self.adb.close()
except Exception:
pass
return
self.log(f"✅ 画面正常,脚本启动")
ctrl_map = {"adb": "ADB", "minitouch": "minitouch", "uiautomator2": "uiautomator2"}
ctrl = ctrl_map.get(self.adb.control_method, "ADB")
shot = self.adb.screenshot_method if self.adb.screenshot_method != "adb" else "ADB"
self.log(f">>> [模式] 触控: {ctrl}, 截图: {shot}")
if os.environ.get("WOA_DEBUG", "").strip().lower() in ("1", "true", "yes"):
try:
debug_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "woa_debug") if not getattr(sys, "frozen", False) else os.path.join(os.path.dirname(sys.executable), "woa_debug")
self.log(f">>> [WOA_DEBUG] 已开启,仅在启动时执行方案测试,结果保存至: {debug_dir}")
except Exception:
self.log(">>> [WOA_DEBUG] 已开启,仅在启动时执行方案测试")
self.log(">>> [WOA_DEBUG] 正在进行截图与触控方案测试...")
self.adb.run_all_method_tests()
woa_debug_set_runtime_started()
self.log(">>> [WOA_DEBUG] 方案测试完成,开始主循环")
os.environ.pop("WOA_DEBUG", None)
thread = threading.Thread(target=self._main_loop)
thread.daemon = True
self._worker_thread = thread
thread.start()
except Exception as e:
self.log(f"❌ 启动失败: {e}")
self.running = False
try:
if hasattr(self, 'adb') and self.adb:
self.adb.close()
except Exception:
pass
def stop(self):
self.running = False
self.log(">>> 正在停止脚本...")
self._print_session_stats()
self._save_stats_to_csv()
self.next_bonus_retry_time = 0
adb_ref = getattr(self, 'adb', None)
if adb_ref:
threading.Thread(target=self._async_close_adb, args=(adb_ref,), daemon=True).start()
def _print_session_stats(self):
start = getattr(self, "_run_start_time", None)
if start is not None:
secs = max(0, int(time.time() - start))
h, rest = divmod(secs, 3600)
m, s = divmod(rest, 60)
if h > 0:
dur = f"{h}小时{m}分{s}秒"
else:
dur = f"{m}分{s}秒"
self.log(f"[统计] 本次运行时长: {dur}")
a = getattr(self, "_stat_session_approach", self._stat_approach)
d = getattr(self, "_stat_session_depart", self._stat_depart)
sc = getattr(self, "_stat_session_stand_count", self._stat_stand_count)
ss = getattr(self, "_stat_session_stand_staff", self._stat_stand_staff)
if a + d + sc == 0:
return
self.log(f"[统计] ═══════════════════════════════════")
self.log(f"[统计] ✈ 进场飞机: {a} 架次")
self.log(f"[统计] ✈ 离场飞机: {d} 架次")
self.log(f"[统计] ✈ 分配地勤: {sc} 架次 / {ss} 人次")
self.log(f"[统计] ═══════════════════════════════════")
def _add_stats_to_csv_date(self, target_date, a, d, sc, ss):
"""将 (a,d,sc,ss) 累加到 CSV 中 target_date 所在行。若 a+d+sc==0 则不写。
所有实例共用同一个 woa_stats.csv,使用文件锁防止并发写入冲突。"""
import csv
if a + d + sc == 0:
return
try:
base_dir = os.path.dirname(os.path.abspath(__file__)) if not getattr(sys, "frozen", False) else os.path.dirname(sys.executable)
csv_path = os.path.join(base_dir, "woa_stats.csv")
lock_path = csv_path + ".lock"
header = ["date", "approach", "depart", "stand_count", "stand_staff"]
# 使用文件锁保证多实例安全
import msvcrt
with open(lock_path, "w") as lf:
lf.write("1")
lf.flush()
msvcrt.locking(lf.fileno(), msvcrt.LK_LOCK, 1)
try:
rows = []
if os.path.isfile(csv_path):
with open(csv_path, "r", encoding="utf-8-sig") as f:
reader = csv.reader(f)
for i, row in enumerate(reader):
if i == 0 and row and row[0].strip().lower() == "date":
continue
if len(row) >= 5:
rows.append(row)
found = False
for row in rows:
if row[0] == target_date:
row[1] = str(int(row[1]) + a)
row[2] = str(int(row[2]) + d)
row[3] = str(int(row[3]) + sc)
row[4] = str(int(row[4]) + ss)
found = True
break
if not found:
rows.append([target_date, str(a), str(d), str(sc), str(ss)])
with open(csv_path, "w", encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)