insurance-spider/sqlite.js

321 lines
8.6 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import Database from "better-sqlite3";
import fs from "fs";
// import { wechatPush } from "./utils.js";
class SQLiteMessageQueue {
constructor() {
// this.db = new Database("message_queue.db");
this.db = new Database("spider_data.db");
this.init();
this.setupGracefulShutdown();
}
init() {
this.db.exec(`
CREATE TABLE IF NOT EXISTS announcements (
id TEXT PRIMARY KEY,
spider_name TEXT NOT NULL,
name TEXT NOT NULL,
publish_time TEXT,
end_time TEXT,
urls TEXT,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT
)
`);
this.db.exec(`
CREATE TABLE IF NOT EXISTS messages (
id TEXT PRIMARY KEY,
spider_name TEXT NOT NULL,
data TEXT NOT NULL,
timestamp TEXT NOT NULL,
status TEXT DEFAULT 'pending',
sent_at TEXT,
error_message TEXT
)
`);
this.db.exec(`
CREATE INDEX IF NOT EXISTS idx_announcements_spider ON announcements(spider_name);
CREATE INDEX IF NOT EXISTS idx_announcements_time ON announcements(publish_time);
CREATE INDEX IF NOT EXISTS idx_announcements_created ON announcements(created_at);
CREATE INDEX IF NOT EXISTS idx_status ON messages(status);
CREATE INDEX IF NOT EXISTS idx_spider_status ON messages(spider_name, status);
CREATE INDEX IF NOT EXISTS idx_timestamp ON messages(timestamp);
`);
this.insertAnnouncementStmt = this.db.prepare(`
INSERT OR REPLACE INTO announcements
(id, spider_name, name, publish_time, end_time, urls, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`);
this.getAnnouncementStmt = this.db.prepare(`
SELECT * FROM announcements WHERE id = ?
`);
this.getAnnouncementsBySpiderStmt = this.db.prepare(`
SELECT * FROM announcements WHERE spider_name = ?
ORDER BY created_at DESC
`);
this.checkAnnouncementExistsStmt = this.db.prepare(`
SELECT COUNT(*) as count FROM announcements WHERE id = ?
`);
// 预编译SQL语句提高性能
this.insertStmt = this.db.prepare(`
INSERT INTO messages (id, spider_name, data, timestamp, status)
VALUES (?, ?, ?, ?, ?)
`);
this.getPendingStmt = this.db.prepare(`
SELECT * FROM messages WHERE status = 'pending'
ORDER BY timestamp ASC
`);
this.getFailedStmt = this.db.prepare(`
SELECT * FROM messages WHERE status = 'failed'
ORDER BY timestamp ASC
`);
this.updateStatusStmt = this.db.prepare(`
UPDATE messages
SET status = ?, sent_at = ?, error_message = ?
WHERE id = ?
`);
}
// safeExecute(methodName, operation, ...args) {
// }
saveAnnouncement(spiderName, announcement) {
const now = new Date().toISOString();
const isNew = !this.isAnnouncementExists(announcement.id);
this.insertAnnouncementStmt.run(
announcement.id,
spiderName,
announcement.name,
announcement.publishTime,
announcement.endTime,
announcement.urls,
isNew ? now : this.getAnnouncement(announcement.id)?.created_at || now,
now
);
return isNew;
}
/**
* 批量保存公告并返回新公告
*/
saveAnnouncements(spiderName, announcements) {
const newAnnouncements = [];
// 使用事务提高性能
const saveMany = this.db.transaction((announcements) => {
for (const announcement of announcements) {
const isNew = this.saveAnnouncement(spiderName, announcement);
if (isNew) {
newAnnouncements.push(announcement);
}
}
});
saveMany(announcements);
console.log(`💾 ${spiderName}: 保存 ${announcements.length} 条公告`);
return newAnnouncements;
}
/**
* 检查公告是否存在
*/
isAnnouncementExists(announcementId) {
const result = this.checkAnnouncementExistsStmt.get(announcementId);
return result.count > 0;
}
/**
* 获取单个公告
*/
getAnnouncement(id) {
return this.getAnnouncementStmt.get(id);
}
/**
* 获取指定爬虫的所有公告
*/
getAnnouncementsBySpider(spiderName) {
return this.getAnnouncementsBySpiderStmt.all(spiderName);
}
/**
* 根据 spiderName 删除其所有公告
*/
deleteAnnouncementsBySpider(spiderName) {
const stmt = this.db.prepare(`DELETE FROM announcements WHERE spider_name = ?`);
const info = stmt.run(spiderName);
console.log(`🗑️ 删除 ${spiderName} 的公告,共删除 ${info.changes}`);
return info.changes;
}
/**
* 过滤出新公告
*/
filterNewAnnouncements(spiderName, announcements) {
return announcements.filter(
(announcement) => !this.isAnnouncementExists(announcement.id)
);
}
// =============
// 消息队列相关方法
// =============
addMessage(spiderName, data) {
const message = {
id: Date.now() + "-" + Math.random().toString(36).substr(2, 9),
spider_name: spiderName,
data: JSON.stringify(data),
timestamp: new Date().toISOString(),
status: "pending",
};
this.insertStmt.run(
message.id,
message.spider_name,
message.data,
message.timestamp,
message.status
);
// wechatPush(spiderName, data);
console.log(`📤 添加消息到队列: ${spiderName} - ${data.length} 条数据`);
return message.id;
}
getPendingMessages() {
const rows = this.getPendingStmt.all();
return rows.map((row) => ({
...row,
data: JSON.parse(row.data),
}));
}
getFailedMessages() {
const rows = this.getFailedStmt.all();
return rows.map((row) => ({
...row,
data: JSON.parse(row.data),
}));
}
updateMessageStatus(id, status, sentAt = null, errorMessage = null) {
this.updateStatusStmt.run(status, sentAt, errorMessage, id);
}
migrateFromJsonFile(spiderName, jsonFilePath) {
try {
if (!fs.existsSync(jsonFilePath)) {
console.log(`📁 ${jsonFilePath} 不存在,跳过迁移`);
return 0;
}
const data = JSON.parse(fs.readFileSync(jsonFilePath, "utf-8"));
if (!Array.isArray(data) || data.length === 0) {
console.log(`📁 ${jsonFilePath} 数据为空,跳过迁移`);
return 0;
}
const migrateMany = this.db.transaction((announcements) => {
for (const announcement of announcements) {
this.saveAnnouncement(spiderName, announcement);
}
});
migrateMany(data);
console.log(`🔄 成功迁移 ${data.length}${spiderName} 数据到数据库`);
return data.length;
} catch (error) {
console.error(`❌ 迁移 ${jsonFilePath} 失败:`, error);
return 0;
}
}
cleanOldMessages(daysBefore = 30) {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - daysBefore);
const stmt = this.db.prepare(`
DELETE FROM messages
WHERE status = 'sent' AND sent_at < ?
`);
const result = stmt.run(cutoffDate.toISOString());
console.log(`🧹 清理了 ${result.changes} 条旧消息`);
}
/**
* 获取统计信息
*/
getStats() {
const stats = {};
// 按爬虫统计公告数量
const announcementStats = this.db
.prepare(
`
SELECT spider_name, COUNT(*) as count
FROM announcements
GROUP BY spider_name
`
).all()
// .prepare(`
// SELECT spider_name, name
// FROM announcements WHERE spider_name = '吉利'
// `)
// .all();
// 消息状态统计(status == pending)
const messageStats = this.db
.prepare(
`
SELECT status, data, sent_at
FROM messages WHERE status = 'pending'
`
)
.all();
stats.announcements = announcementStats;
stats.messages = messageStats;
return stats;
}
setupGracefulShutdown() {
// 正常退出信号
process.on("SIGINT", () => {
console.log("收到 SIGINT 信号,正在关闭数据库...");
this.close();
process.exit(0);
});
// 终止信号
process.on("SIGTERM", () => {
console.log("收到 SIGTERM 信号,正在关闭数据库...");
this.close();
process.exit(0);
});
// 未捕获异常
process.on("uncaughtException", (error) => {
console.error("未捕获异常:", error);
this.close();
process.exit(1);
});
// 未处理的Promise拒绝
process.on("unhandledRejection", (reason, promise) => {
console.error("未处理的Promise拒绝:", reason);
this.close();
process.exit(1);
});
}
// 关闭数据库连接
close() {
this.db.close();
}
}
export { SQLiteMessageQueue };