52. Рефакторинг SomeServiceImpl для асинхронности и корректности #
Дан код. Надо найти ошибки в классе и поправить. Дан высоконагруженный микросервис. Запущено в k8s 8 подов (реплик). Цель кода вычитать из БД записи и отправить в сервис аналитики. За это отвечает метод processJobRunning(). И вторая задача кода: отправить чеки в налоговую. Снаружи из другого нашего микросервиса приходит запрос sendReceipt(). Объект taxClient наподобие feign client. Налоговая может отвечать больше 10 сек, а нам нужно работать быстро. Как это порешать? Там, где написано “граница класса”, там ниже есть DTO-шки, репозитории, которые помогут прочитать код. И там ошибки не надо искать
Условие задачи:
📌 Даны два метода в высоконагруженном Spring‑микросервисе:
processJobRunning()
— раз в час читает необработанные возвратные чеки и отправляет события в аналитику, затем сохраняет чек.sendReceipt()
— принимает DTO чека и синхронно вызывает внешний Feign‑клиентtaxClient.sendReceipt()
, который может отвечать до 60 секунд.
Нужно:
- Исправить ошибки в текущей реализации (ограничение AOP, некорректный вызов
findById()
, кавычки в@Scheduled
и т.п.). - Обеспечить, чтобы метод
sendReceipt()
не блокировал поток ожиданием ответа налоговой, а работал асинхронно.
Код:
// Микросервис чеков высоконагруженного маркетплейса, десятки тысяч заказов в сутки.
// Основные функции микросервиса - и 30 - сохранение и отправка чеков в налоговую (во внешний независимый сервис)
// (Инфраструктура - Kubernetes 8 подов (реплик), высоконагруженный).
// Две независимые функции (процесса)
// 1. Сохранение в бд (Postgres) сведения о данных в чек и отправка в Налоговую - метод sendReceipt (в реальном времени).
// 2. Сохранение по расписанию (раз в час) чеков возврата - метод processJobRunning.
// TaxClient - клиент налоговой (feign) - timeout = 60 секунд (не наш, не можем повлиять)
@Service
@AllArgsConstructor
public class SomeServiceImpl implements Someservice {
private final TaxClient taxClient;
private final ReceiptDao receiptDao;
private final ReceiptTypeDao receiptTypeDao;
private final ReceiptMapper mapper;
private final ReceiptTypeMapper typeMapper;
private final RefundReceiptMapper refundReceiptMapper;
private final SomeService self;
@Scheduled(cron = ""${cron.expression}"") // ""30 30 * * *""
public void processJobRunning() {
List<Receipt> receipts = getRefundReceipts();
for (Receipt receipt : receipts) {
receipt.setProcessed(true);
eventListener.sendEventToAnalytics(new SomeEvent(receipt.getId()); //send simple event, using org.springframework.context.event, idempotent
receiptDao.save(mapReceipt(mapper.map(receipt)));
}
}
@Transactional(readOnly = true)
public List<Receipt> getRefundReceipts() {
return receiptDao.findAllBySourceAndProcessedFalse(ReceiptSource.REFUND); // select * from receipt where source = 'REFUNC' and processed = false;
}
private Receipt mapReceipt(ReceiptDto receiptDto) {
return mapper.map(receiptDto);
}
//приходит http-запрос из другого микросервиса
public void sendReceipt(ReceiptDto receiptDto) {
ReceiptSource source = self.getReceiptSource(receiptDto);
taxClient.sendReceipt(receiptDto, source);
}
@Transactional
@Cacheable(value = ""receipt_type"", cacheManager = ""redisCacheManager"") // по сути работает как hashMap
public ReceiptSource getReceiptSource(ReceiptDto receiptDto) {
Receipt receipt = receiptDao.findById(receiptDto.getId());
ReceiptSource source = receipt.getSource();
return source;
}
}
// граница класса ----
public interface ReceiptDao extends JpaRepository<Receipt, Long> {
List<Receipt> findAllBySourceAndProcessedFalse(ReceiptSource source);
}
//тут все корректно - для наглядности - mapstruct
@Mapper(componentModel = SPRING)
public interface ReceiptMapper {
Receipt map(ReceiptDto dto);
ReceiptDto map(Receipt receipt);
}
// граница класса ----
// тут все корректно - dto для наглядности
public enum ReceiptSource {
DISCOUNT,
SELL,
REFUND
}
// граница класса ----
//тут все корректно - dto для наглядности
@Data
@NoArgsConstructor
public class ReceiptDto {
private Long id;
private String address;
private String sum;
private ReceiptSource source;
}
// граница класса ----
//тут все корректно - entity для наглядности
//@Data
@Entity
@ALLArgsConstructor
@NoArgsConstructor
@Data
public class Receipt {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private boolean processed;
private String address;
private String sum;
private ReceiptSource source;
}
Спойлеры к решению
Подсказки
💡 В
💡 AOP-прокси Spring с
💡
💡 Чтобы не блокировать поток долгим Feign‑вызовом, сделайте
💡 При сохранении внутри цикла оборачивайте отправку события в
@Scheduled
нужно писать cron = "${cron.expression}"
, а не двойные кавычки внутри.💡 AOP-прокси Spring с
@Cacheable
не срабатывает при вызове this.getReceiptSource(...)
— надо вызывать через биновый прокси (self.getReceiptSource(...)
).💡
findById()
возвращает Optional<Receipt>
— обязательно обрабатывать отсутствие записи.💡 Чтобы не блокировать поток долгим Feign‑вызовом, сделайте
sendReceipt()
асинхронным (@Async
) или отправляйте DTO в очередь (Kafka/RabbitMQ).💡 При сохранении внутри цикла оборачивайте отправку события в
try/catch
, чтобы одна ошибка не прерывала весь процесс.Решение
import java.util.List;
import java.util.concurrent.Executor;
import lombok.AllArgsConstructor;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@AllArgsConstructor
public class SomeServiceImpl implements SomeService {
private final TaxClient taxClient;
private final ReceiptDao receiptDao;
private final SomeEventListener eventListener;
private final SomeService self; // биновый прокси для AOP
@Scheduled(cron = "${cron.expression}")
public void processJobRunning() {
List<Receipt> receipts = getRefundReceipts();
for (Receipt receipt : receipts) {
try {
receipt.setProcessed(true);
eventListener.sendEventToAnalytics(new SomeEvent(receipt.getId()));
receiptDao.save(receipt);
} catch (Exception ex) {
// логируем и продолжаем
log.error("Error processing receipt {}", receipt.getId(), ex);
}
}
}
@Transactional(readOnly = true)
public List<Receipt> getRefundReceipts() {
return receiptDao.findAllBySourceAndProcessedFalse(ReceiptSource.REFUND);
}
public void sendReceipt(ReceiptDto receiptDto) {
// Асинхронный вызов через прокси Spring
self.sendReceiptAsync(receiptDto);
}
@Async("taskExecutor")
public void sendReceiptAsync(ReceiptDto receiptDto) {
ReceiptSource source = getReceiptSource(receiptDto);
taxClient.sendReceipt(receiptDto, source);
}
@Transactional
@Cacheable(value = "receipt_type", cacheManager = "redisCacheManager")
public ReceiptSource getReceiptSource(ReceiptDto receiptDto) {
Receipt receipt = receiptDao.findById(receiptDto.getId())
.orElseThrow(() -> new EntityNotFoundException("Receipt not found: " + receiptDto.getId()));
return receipt.getSource();
}
}