// 大数据推荐模块:利用Spark分析用户历史观影偏好,生成个性化推荐// 此处模拟Spark环境,实际项目中可能作为独立的数据分析服务运行SparkSession spark = SparkSession.builder().appName("MovieRecommender").master("local[*]").getOrCreate();public List<Movie> generateRecommendations(String userId) { // 1. 从MySQL加载用户历史订单数据(包含电影类型) Dataset<Row> userOrders = spark.read().format("jdbc").option("url", "jdbc:mysql://localhost:3306/cinema_db").option("dbtable", "(SELECT o.movie_id, m.genre FROM orders o JOIN movies m ON o.movie_id = m.id WHERE o.user_id = '" + userId + "') AS user_genres").option("user", "root").option("password", "password").load(); // 2. 统计用户最喜爱的电影类型 Dataset<Row> favoriteGenres = userOrders.groupBy("genre").count().orderBy(org.apache.spark.sql.functions.desc("count")); String topGenre = favoriteGenres.first().getString(0); // 获取最受欢迎的类型 // 3. 从MySQL加载所有电影数据 Dataset<Row> allMovies = spark.read().format("jdbc").option("url", "jdbc:mysql://localhost:3306/cinema_db").option("dbtable", "movies").option("user", "root").option("password", "password").load(); // 4. 筛选出用户未看过的且属于其喜爱类型的电影 Dataset<Row> recommendedMovies = allMovies.join(userOrders, allMovies.col("id").equalTo(userOrders.col("movie_id")), "left_anti").where(allMovies.col("genre").equalTo(topGenre)); // 5. 转换为List<Movie>对象并返回(此处简化,实际需查询数据库获取完整对象) List<String> movieIds = recommendedMovies.select("id").as(Encoders.STRING()).collectAsList(); return movieMapper.selectMoviesByIds(movieIds);}spark.stop();// 核心功能一:选座与锁定public Map<String, Object> selectAndLockSeats(String showId, List<String> seatIds, String userId) { Map<String, Object> result = new HashMap<>(); // 1. 校验请求参数 if (showId == null || seatIds == null || seatIds.isEmpty()) { result.put("code", 400); result.put("msg", "场次或座位信息不能为空"); return result; } // 2. 批量查询座位状态,确保所有座位均可售 List<Seat> seats = seatMapper.selectSeatsByShowIdAndSeatIds(showId, seatIds); for (Seat seat : seats) { if (!"AVAILABLE".equals(seat.getStatus())) { result.put("code", 409); result.put("msg", "座位 " + seat.getSeatCode() + " 已被选或已锁定"); return result; } } // 3. 生成锁定token并设置过期时间(例如15分钟) String lockToken = UUID.randomUUID().toString(); Date lockTime = new Date(); Date expireTime = new Date(lockTime.getTime() + 15 * 60 * 1000); // 4. 批量更新座位状态为LOCKED,并记录锁定信息 seatMapper.batchLockSeats(showId, seatIds, lockToken, userId, lockTime, expireTime); // 5. 返回锁定成功信息及token result.put("code", 200); result.put("msg", "座位锁定成功"); result.put("lockToken", lockToken); result.put("expireTime", expireTime); return result;}// 核心功能二:创建订单与支付public Map<String, Object> createOrder(String lockToken, String userId) { Map<String, Object> result = new HashMap<>(); // 1. 根据lockToken查询锁定信息 SeatLock lock = seatLockMapper.selectByToken(lockToken); if (lock == null || lock.getExpireTime().before(new Date()) || !lock.getUserId().equals(userId)) { result.put("code", 400); result.put("msg", "锁定已失效或token无效"); return result; } // 2. 计算订单总金额 List<Seat> lockedSeats = seatMapper.selectLockedSeatsByToken(lockToken); BigDecimal totalAmount = BigDecimal.ZERO; for (Seat seat : lockedSeats) { totalAmount = totalAmount.add(seat.getPrice()); } // 3. 生成订单号并创建订单记录 String orderNo = "ORD" + System.currentTimeMillis(); Order order = new Order(); order.setOrderNo(orderNo); order.setUserId(userId); order.setShowId(lock.getShowId()); order.setSeatIds(String.join(",", lock.getSeatIds())); order.setAmount(totalAmount); order.setStatus("PENDING_PAYMENT"); orderMapper.insert(order); // 4. 调用支付接口(此处模拟),获取支付链接 String paymentUrl = paymentService.createPayment(orderNo, totalAmount, "影院购票"); // 5. 返回订单信息及支付链接 result.put("code", 200); result.put("msg", "订单创建成功"); result.put("orderNo", orderNo); result.put("paymentUrl", paymentUrl); return result;}// 核心功能三:支付回调处理public String handlePaymentCallback(String orderNo, String paymentStatus) { // 1. 根据订单号查询订单 Order order = orderMapper.selectByOrderNo(orderNo); if (order == null) { return "Order not found"; } // 2. 校验订单状态,防止重复处理 if (!"PENDING_PAYMENT".equals(order.getStatus())) { return "Order already processed"; } // 3. 判断支付结果 if ("SUCCESS".equals(paymentStatus)) { // 支付成功,更新订单状态为已支付 order.setStatus("PAID"); order.setPayTime(new Date()); orderMapper.updateStatus(order); // 更新座位状态为已售出 seatMapper.batchUpdateSeatsStatusToSold(order.getShowId(), Arrays.asList(order.getSeatIds().split(","))); // 发送购票成功通知(可集成微信模板消息) notificationService.sendSuccessNotification(order.getUserId(), orderNo); return "success"; } else { // 支付失败,更新订单状态为已关闭,并释放锁定的座位 order.setStatus("CLOSED"); orderMapper.updateStatus(order); seatService.releaseSeatsLock(order.getSeatIds()); return "failure"; }}