chore: oss 库 完善

pull/409/head
xiaojunnuo 2025-04-27 01:31:46 +08:00
parent 5abce916a8
commit 2943e0e58d
13 changed files with 232 additions and 82 deletions

View File

@ -52,7 +52,7 @@ export class AliossClient {
}
}
async uploadFile(filePath: string, content: Buffer) {
async uploadFile(filePath: string, content: Buffer | string) {
await this.init();
return await this.client.put(filePath, content);
}

View File

@ -11,7 +11,7 @@ export class FtpClient {
this.logger = opts.logger;
}
async connect(callback: (client: FtpClient) => Promise<void>) {
async connect(callback: (client: FtpClient) => Promise<any>) {
const ftp = await import("basic-ftp");
const Client = ftp.Client;
const client = new Client();
@ -21,7 +21,7 @@ export class FtpClient {
this.logger.info("FTP连接成功");
this.client = client;
try {
await callback(this);
return await callback(this);
} finally {
if (client) {
client.close();
@ -46,6 +46,12 @@ export class FtpClient {
}
async listDir(dir: string): Promise<any[]> {
if (!dir) {
return [];
}
if (!dir.endsWith("/")) {
dir = dir + "/";
}
this.logger.info(`开始列出目录${dir}`);
return await this.client.list(dir);
}

View File

@ -9,6 +9,7 @@ export type OssClientRemoveByOpts = {
};
export type OssFileItem = {
//文件全路径
path: string;
size: number;
//毫秒时间戳
@ -71,17 +72,18 @@ export abstract class BaseOssClient<A> implements IOssClient {
// do nothing
}
abstract remove(fileName: string): Promise<void>;
abstract remove(fileName: string, opts?: { joinRootDir?: boolean }): Promise<void>;
abstract upload(fileName: string, fileContent: Buffer): Promise<void>;
abstract download(fileName: string, savePath: string): Promise<void>;
abstract listDir(dir: string): Promise<OssFileItem[]>;
async removeBy(removeByOpts: OssClientRemoveByOpts): Promise<void> {
const list = await this.listDir(removeByOpts.dir);
// removeByOpts.beforeDays = 0;
const beforeDate = dayjs().subtract(removeByOpts.beforeDays, "day");
for (const item of list) {
if (item.lastModified && item.lastModified < beforeDate.valueOf()) {
await this.remove(item.path);
await this.remove(item.path, { joinRootDir: false });
}
}
}

View File

@ -37,7 +37,7 @@ export default class AliOssClientImpl extends BaseOssClient<AliossAccess> {
};
});
}
async upload(filePath: string, fileContent: Buffer) {
async upload(filePath: string, fileContent: Buffer | string) {
const key = this.join(this.rootDir, filePath);
this.logger.info(`开始上传文件: ${key}`);
await this.client.uploadFile(key, fileContent);
@ -45,8 +45,11 @@ export default class AliOssClientImpl extends BaseOssClient<AliossAccess> {
this.logger.info(`文件上传成功: ${filePath}`);
}
async remove(filePath: string) {
const key = this.join(this.rootDir, filePath);
async remove(filePath: string, opts?: { joinRootDir?: boolean }) {
if (opts?.joinRootDir !== false) {
filePath = this.join(this.rootDir, filePath);
}
const key = filePath;
// remove file from alioss
await this.client.removeFile(key);
this.logger.info(`文件删除成功: ${key}`);

View File

@ -5,23 +5,52 @@ import fs from "fs";
import { FtpAccess, FtpClient } from "../../ftp/index.js";
export default class FtpOssClientImpl extends BaseOssClient<FtpAccess> {
async download(fileName: string, savePath: string) {}
async listDir(dir: string) {
return [];
join(...strs: string[]) {
const str = super.join(...strs);
if (!str.startsWith("/")) {
return "/" + str;
}
return str;
}
async upload(filePath: string, fileContent: Buffer) {
async download(fileName: string, savePath: string) {
const client = this.getFtpClient();
await client.connect(async client => {
const tmpFilePath = path.join(os.tmpdir(), "cert", "http", filePath);
const dir = path.dirname(tmpFilePath);
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
const path = this.join(this.rootDir, fileName);
await client.download(path, savePath);
});
}
async listDir(dir: string) {
const client = this.getFtpClient();
return await client.connect(async (client: FtpClient) => {
const path = this.join(this.rootDir, dir);
const res = await client.listDir(path);
return res.map(item => {
return {
path: this.join(path, item.name),
size: item.size,
lastModified: item.modifiedAt.getTime(),
};
});
});
}
async upload(filePath: string, fileContent: Buffer | string) {
const client = this.getFtpClient();
await client.connect(async client => {
let tmpFilePath = fileContent as string;
if (typeof fileContent !== "string") {
tmpFilePath = path.join(os.tmpdir(), "cert", "oss", filePath);
const dir = path.dirname(tmpFilePath);
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
}
fs.writeFileSync(tmpFilePath, fileContent);
}
fs.writeFileSync(tmpFilePath, fileContent);
try {
// Write file to temp path
const path = this.join(this.rootDir, filePath);
await client.upload(path, tmpFilePath);
await client.upload(tmpFilePath, path);
} finally {
// Remove temp file
fs.unlinkSync(tmpFilePath);
@ -36,11 +65,14 @@ export default class FtpOssClientImpl extends BaseOssClient<FtpAccess> {
});
}
async remove(filePath: string) {
async remove(filePath: string, opts?: { joinRootDir?: boolean }) {
if (opts?.joinRootDir !== false) {
filePath = this.join(this.rootDir, filePath);
}
const client = this.getFtpClient();
await client.connect(async client => {
const path = this.join(this.rootDir, filePath);
await client.client.remove(path);
await client.client.remove(filePath);
this.logger.info(`删除文件成功: ${filePath}`);
});
}
}

View File

@ -29,19 +29,22 @@ export default class QiniuOssClientImpl extends BaseOssClient<QiniuOssAccess> {
const res = await this.client.listDir(this.access.bucket, path);
return res.items.map(item => {
return {
path: item.name,
path: item.key,
size: item.fsize,
lastModified: item.putTime,
//ns 纳秒去掉低4位 为毫秒
lastModified: Math.floor(item.putTime / 10000),
};
});
}
async upload(filePath: string, fileContent: Buffer) {
async upload(filePath: string, fileContent: Buffer | string) {
const path = this.join(this.rootDir, filePath);
await this.client.uploadFile(this.access.bucket, path, fileContent);
}
async remove(filePath: string) {
const path = this.join(this.rootDir, filePath);
await this.client.removeFile(this.access.bucket, path);
async remove(filePath: string, opts?: { joinRootDir?: boolean }) {
if (opts?.joinRootDir !== false) {
filePath = this.join(this.rootDir, filePath);
}
await this.client.removeFile(this.access.bucket, filePath);
}
}

View File

@ -79,8 +79,11 @@ export default class S3OssClientImpl extends BaseOssClient<S3Access> {
this.logger.info(`文件上传成功: ${filePath}`);
}
async remove(filePath: string) {
const key = path.join(this.rootDir, filePath);
async remove(filePath: string, opts?: { joinRootDir?: boolean }) {
if (opts?.joinRootDir !== false) {
filePath = this.join(this.rootDir, filePath);
}
const key = filePath;
// @ts-ignore
const { DeleteObjectCommand } = await import("@aws-sdk/client-s3");
await this.client.send(

View File

@ -1,31 +1,51 @@
import { BaseOssClient, OssClientRemoveByOpts, OssFileItem } from "../api.js";
import { BaseOssClient, OssFileItem } from "../api.js";
import path from "path";
import os from "os";
import fs from "fs";
import { SftpAccess, SshAccess, SshClient } from "../../ssh/index.js";
export default class SftpOssClientImpl extends BaseOssClient<SftpAccess> {
download(fileName: string, savePath: string): Promise<void> {
throw new Error("Method not implemented.");
async download(fileName: string, savePath: string): Promise<void> {
const path = this.join(this.rootDir, fileName);
const client = new SshClient(this.logger);
const access = await this.ctx.accessService.getById<SshAccess>(this.access.sshAccess);
await client.download({
connectConf: access,
filePath: path,
savePath,
});
}
removeBy(removeByOpts: OssClientRemoveByOpts): Promise<void> {
throw new Error("Method not implemented.");
}
listDir(dir: string): Promise<OssFileItem[]> {
throw new Error("Method not implemented.");
}
async upload(filePath: string, fileContent: Buffer) {
const tmpFilePath = path.join(os.tmpdir(), "cert", "http", filePath);
// Write file to temp path
const dir = path.dirname(tmpFilePath);
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
async listDir(dir: string): Promise<OssFileItem[]> {
const path = this.join(this.rootDir, dir);
const client = new SshClient(this.logger);
const access = await this.ctx.accessService.getById<SshAccess>(this.access.sshAccess);
const res = await client.listDir({
connectConf: access,
dir: path,
});
return res.map(item => {
return {
path: this.join(path, item.filename),
size: item.size,
lastModified: item.attrs.atime * 1000,
};
});
}
async upload(filePath: string, fileContent: Buffer | string) {
let tmpFilePath = fileContent as string;
if (typeof fileContent !== "string") {
tmpFilePath = path.join(os.tmpdir(), "cert", "oss", filePath);
const dir = path.dirname(tmpFilePath);
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
}
fs.writeFileSync(tmpFilePath, fileContent);
}
fs.writeFileSync(tmpFilePath, fileContent);
const access = await this.ctx.accessService.getById<SshAccess>(this.access.sshAccess);
const key = this.rootDir + filePath;
const key = this.join(this.rootDir, filePath);
try {
const client = new SshClient(this.logger);
await client.uploadFiles({
@ -37,6 +57,7 @@ export default class SftpOssClientImpl extends BaseOssClient<SftpAccess> {
remotePath: key,
},
],
uploadType: "sftp",
opts: {
mode: this.access?.fileMode ?? undefined,
},
@ -47,13 +68,15 @@ export default class SftpOssClientImpl extends BaseOssClient<SftpAccess> {
}
}
async remove(filePath: string) {
async remove(filePath: string, opts?: { joinRootDir?: boolean }) {
const access = await this.ctx.accessService.getById<SshAccess>(this.access.sshAccess);
const client = new SshClient(this.logger);
const key = this.rootDir + filePath;
if (opts?.joinRootDir !== false) {
filePath = this.join(this.rootDir, filePath);
}
await client.removeFiles({
connectConf: access,
files: [key],
files: [filePath],
});
}
}

View File

@ -4,6 +4,7 @@ import os from "os";
import fs from "fs";
import { SshAccess, SshClient } from "../../ssh/index.js";
//废弃
export default class SshOssClientImpl extends BaseOssClient<SshAccess> {
download(fileName: string, savePath: string): Promise<void> {
throw new Error("Method not implemented.");
@ -43,12 +44,14 @@ export default class SshOssClientImpl extends BaseOssClient<SshAccess> {
}
}
async remove(filePath: string) {
async remove(filePath: string, opts?: { joinRootDir?: boolean }) {
if (opts?.joinRootDir !== false) {
filePath = this.join(this.rootDir, filePath);
}
const client = new SshClient(this.logger);
const key = this.rootDir + filePath;
await client.removeFiles({
connectConf: this.access,
files: [key],
files: [filePath],
});
}
}

View File

@ -38,13 +38,17 @@ export default class TencentOssClientImpl extends BaseOssClient<TencentCosAccess
};
});
}
async upload(filePath: string, fileContent: Buffer) {
async upload(filePath: string, fileContent: Buffer | string) {
const key = this.join(this.rootDir, filePath);
await this.client.uploadFile(key, fileContent);
this.logger.info(`文件上传成功: ${filePath}`);
}
async remove(filePath: string) {
const key = this.join(this.rootDir, filePath);
await this.client.removeFile(key);
async remove(filePath: string, opts?: { joinRootDir?: boolean }) {
if (opts?.joinRootDir !== false) {
filePath = this.join(this.rootDir, filePath);
}
await this.client.removeFile(filePath);
this.logger.info(`文件删除成功: ${filePath}`);
}
}

View File

@ -136,6 +136,20 @@ export class AsyncSsh2Client {
});
}
async listDir(options: { sftp: any; remotePath: string }) {
const { sftp, remotePath } = options;
return new Promise((resolve, reject) => {
this.logger.info(`listDir${remotePath}`);
sftp.readdir(remotePath, (err: Error, list: any) => {
if (err) {
reject(err);
return;
}
resolve(list);
});
});
}
async unlink(options: { sftp: any; remotePath: string }) {
const { sftp, remotePath } = options;
return new Promise((resolve, reject) => {
@ -283,6 +297,28 @@ export class AsyncSsh2Client {
}
return proxy;
}
async download(param: { remotePath: string; savePath: string; sftp: any }) {
return new Promise((resolve, reject) => {
const { remotePath, savePath, sftp } = param;
sftp.fastGet(
remotePath,
savePath,
{
step: (transferred: any, chunk: any, total: any) => {
this.logger.info(`${transferred} / ${total}`);
},
},
(err: any) => {
if (err) {
reject(err);
} else {
resolve({});
}
}
);
});
}
}
export class SshClient {
@ -329,17 +365,17 @@ export class SshClient {
}
}
if (options.uploadType === "sftp") {
const sftp = await conn.getSftp();
for (const transport of transports) {
await conn.fastPut({ sftp, ...transport, opts });
}
} else {
if (options.uploadType === "scp") {
//scp
for (const transport of transports) {
await this.scpUpload({ conn, ...transport, opts });
await new Promise(resolve => setTimeout(resolve, 1000));
}
} else {
const sftp = await conn.getSftp();
for (const transport of transports) {
await conn.fastPut({ sftp, ...transport, opts });
}
}
this.logger.info("文件全部上传成功");
@ -359,25 +395,29 @@ export class SshClient {
if (err) {
return reject(err);
}
// 准备 SCP 协议头
const fileStats = fs.statSync(localPath);
const fileName = path.basename(localPath);
try {
// 准备 SCP 协议头
const fileStats = fs.statSync(localPath);
const fileName = path.basename(localPath);
// SCP 协议格式C[权限] [文件大小] [文件名]\n
stream.write(`C0644 ${fileStats.size} ${fileName}\n`);
// SCP 协议格式C[权限] [文件大小] [文件名]\n
stream.write(`C0644 ${fileStats.size} ${fileName}\n`);
// 通过管道传输文件
fs.createReadStream(localPath)
.on("error", e => {
this.logger.info("read stream error", e);
reject(e);
})
.pipe(stream)
.on("finish", async () => {
this.logger.info(`上传完成:${localPath} => ${remotePath}`);
resolve(true);
})
.on("error", reject);
// 通过管道传输文件
fs.createReadStream(localPath)
.on("error", e => {
this.logger.info("read stream error", e);
reject(e);
})
.pipe(stream)
.on("finish", async () => {
this.logger.info(`上传完成:${localPath} => ${remotePath}`);
resolve(true);
})
.on("error", reject);
} catch (e) {
reject(e);
}
}
);
} catch (e) {
@ -526,4 +566,31 @@ export class SshClient {
conn.end();
}
}
async listDir(param: { connectConf: any; dir: string }) {
return await this._call<any>({
connectConf: param.connectConf,
callable: async (conn: AsyncSsh2Client) => {
const sftp = await conn.getSftp();
return await conn.listDir({
sftp,
remotePath: param.dir,
});
},
});
}
async download(param: { connectConf: any; filePath: string; savePath: string }) {
return await this._call<any>({
connectConf: param.connectConf,
callable: async (conn: AsyncSsh2Client) => {
const sftp = await conn.getSftp();
return await conn.download({
sftp,
remotePath: param.filePath,
savePath: param.savePath,
});
},
});
}
}

View File

@ -27,12 +27,16 @@ export class TencentCosClient {
async uploadFile(key: string, file: Buffer | string) {
const cos = await this.getCosClient();
return new Promise((resolve, reject) => {
let readableStream = file as any;
if (typeof file === "string") {
readableStream = fs.createReadStream(file);
}
cos.putObject(
{
Bucket: this.bucket /* 必须 */,
Region: this.region /* 必须 */,
Key: key /* 必须 */,
Body: file, // 上传文件对象
Body: readableStream, // 上传文件对象
onProgress: function (progressData) {
console.log(JSON.stringify(progressData));
},

View File

@ -8,7 +8,7 @@ import * as os from "node:os";
import { OssClientContext, ossClientFactory, OssClientRemoveByOpts, SshAccess, SshClient } from "@certd/plugin-lib";
const defaultBackupDir = 'certd_backup';
const defaultFilePrefix = 'db-backup';
const defaultFilePrefix = 'db_backup';
@IsTaskPlugin({
name: 'DBBackupPlugin',
@ -165,7 +165,7 @@ export class DBBackupPlugin extends AbstractPlusTaskPlugin {
this.logger.error('数据库文件不存在:', dbPath);
return;
}
const dbTmpFilename = `${this.filePrefix}.${dayjs().format('YYYYMMDD.HHmmss')}.sqlite`;
const dbTmpFilename = `${this.filePrefix}_${dayjs().format('YYYYMMDD_HHmmss')}_sqlite`;
const dbZipFilename = `${dbTmpFilename}.zip`;
const tempDir = path.resolve(os.tmpdir(), 'certd_backup');
if (!fs.existsSync(tempDir)) {