The Persistence Layer provides crash-safe storage for signal state and risk management data in the backtest-kit framework. This layer implements atomic file writes to ensure no data loss during crashes, automatic recovery from corrupted files, and support for custom storage backends (Redis, MongoDB, PostgreSQL).
For signal lifecycle management, see Signal Lifecycle. For risk management data structures, see Risk Management. For logging infrastructure, see Logging System.
The persistence layer consists of three main components: the abstract PersistBase class, specialized adapters for signals and risk data, and the atomic file writing utility.
PersistBase is the foundation for all persistence implementations. It provides a file-based default implementation that can be overridden for custom backends.
The IPersistBase interface defines the contract all persistence implementations must fulfill:
The constructor accepts an entityName (e.g., "signal", "risk") and optional baseDir (default: ./logs/data). It computes the storage directory as baseDir/entityName:
| Parameter | Type | Default | Purpose |
|---|---|---|---|
entityName |
string |
required | Unique identifier for entity type |
baseDir |
string |
./logs/data |
Base directory for all persistence |
_directory |
string |
computed | Full path: baseDir/entityName |
Write operations use writeFileAtomic to ensure crash safety (see Atomic File Writes):
The hasValue method checks if an entity exists without reading its contents:
| Method | Returns | Error Handling |
|---|---|---|
hasValue(entityId) |
true if exists |
Returns false on ENOENT |
false if not exists |
Throws on other errors |
// Remove single entity
await persist.removeValue("BTCUSDT");
// Remove all entities for this type
await persist.removeAll();
PersistBase implements AsyncIterableIterator for convenient iteration over all stored entities:
Entities are sorted alphanumerically by ID using localeCompare with numeric sensitivity.
The writeFileAtomic function ensures that file writes either complete fully or leave the original file unchanged, preventing data corruption during crashes.
| Platform | Strategy | Atomicity | Temp File |
|---|---|---|---|
| POSIX | Temp file + rename | Full atomic replacement | .tmp-{random}-{filename} |
| Windows | Direct write + sync | Minimizes corruption risk | None |
On POSIX systems, if any step fails, the temporary file is cleaned up before rethrowing the error:
interface Options {
encoding?: BufferEncoding | undefined; // Default: "utf8"
mode?: number | undefined; // Default: 0o666
tmpPrefix?: string; // Default: ".tmp-"
}
The default implementation stores entities as JSON files in a hierarchical directory structure.
./logs/data/
├── signal/
│ ├── strategy-a/
│ │ ├── BTCUSDT.json
│ │ ├── ETHUSDT.json
│ │ └── SOLUSDT.json
│ └── strategy-b/
│ └── BTCUSDT.json
└── risk/
├── conservative/
│ └── positions.json
└── aggressive/
└── positions.json
| Entity Type | Entity Name | Entity ID | File Path |
|---|---|---|---|
signal |
strategy-a |
BTCUSDT |
./logs/data/signal/strategy-a/BTCUSDT.json |
risk |
conservative |
positions |
./logs/data/risk/conservative/positions.json |
All entities are serialized using JSON.stringify and deserialized using JSON.parse:
// Write: Entity → JSON string → File
const serializedData = JSON.stringify(entity);
await writeFileAtomic(filePath, serializedData, "utf-8");
// Read: File → JSON string → Entity
const fileContent = await fs.readFile(filePath, "utf-8");
return JSON.parse(fileContent) as T;
During waitForInit, the system validates all existing files and removes corrupted ones. The cleanup uses retry logic with configurable parameters:
| Constant | Value | Purpose |
|---|---|---|
BASE_UNLINK_RETRY_COUNT |
5 | Number of retry attempts |
BASE_UNLINK_RETRY_DELAY |
1000ms | Delay between retries |
The framework provides two specialized adapter classes for different data types: signals and risk positions.
Manages signal state persistence for live trading. Each strategy-symbol combination gets its own file:
Entity Name Pattern: signal/{strategyName}
Entity ID Pattern: {symbol} (e.g., "BTCUSDT")
Manages active position tracking for risk management. Each risk profile stores all positions in a single file:
Entity Name Pattern: risk/{riskName}
Entity ID Pattern: Always "positions"
Custom adapters are registered before running any strategies:
import { PersistSignalAdaper, PersistRiskAdapter } from "backtest-kit";
// Register custom signal adapter
PersistSignalAdaper.usePersistSignalAdapter(RedisPersist);
// Register custom risk adapter
PersistRiskAdapter.usePersistRiskAdapter(MongoPersist);
// Now run strategies - they will use custom adapters
Live.background("BTCUSDT", { ... });
The waitForInit method implements crash recovery by validating existing data and cleaning up corrupted files.
The singleshot decorator ensures initialization happens only once, even if called multiple times.
Error messages logged:
"backtest-kit PersistBase found invalid document for filePath={path} entityName={name}""backtest-kit PersistBase failed to remove invalid document for filePath={path} entityName={name}"Custom persistence implementations must implement the IPersistBase interface with four required methods.
interface IPersistBase<Entity extends IEntity = IEntity> {
// Initialize connection/storage
waitForInit(initial: boolean): Promise<void>;
// Read entity by ID
readValue(entityId: EntityId): Promise<Entity>;
// Check if entity exists
hasValue(entityId: EntityId): Promise<boolean>;
// Write entity atomically
writeValue(entityId: EntityId, entity: Entity): Promise<void>;
}
Key implementation details:
| Method | Redis Operation | Key Pattern |
|---|---|---|
readValue |
redis.get(key) |
{entityName}:{entityId} |
hasValue |
redis.exists(key) |
{entityName}:{entityId} |
writeValue |
redis.set(key, data) |
{entityName}:{entityId} |
removeValue |
redis.del(key) |
{entityName}:{entityId} |
removeAll |
redis.keys() + redis.del() |
{entityName}:* |
MongoDB implementation uses a collection per entity type with documents containing entityId and data fields:
// Collection schema
{
entityId: string, // e.g., "BTCUSDT"
data: Entity, // Serialized entity
updatedAt: Date // Last update timestamp
}
Key operations:
| Method | MongoDB Operation | Query Filter |
|---|---|---|
readValue |
collection.findOne() |
{ entityId } |
hasValue |
collection.countDocuments() |
{ entityId } |
writeValue |
collection.updateOne() (upsert) |
{ entityId } |
removeValue |
collection.deleteOne() |
{ entityId } |
removeAll |
collection.deleteMany() |
{} |
Users can instantiate PersistBase directly for custom data storage:
import { PersistBase } from "backtest-kit";
// Create persistence for custom entity type
const tradingLogs = new PersistBase("trading-logs", "./logs/custom");
// Initialize
await tradingLogs.waitForInit(true);
// Write log entry
await tradingLogs.writeValue("log-1", {
timestamp: Date.now(),
message: "Strategy started",
metadata: { symbol: "BTCUSDT" }
});
// Read log entry
const log = await tradingLogs.readValue("log-1");
// Iterate over all logs
for await (const log of tradingLogs.values()) {
console.log("Log:", log);
}
// Filter logs (helper method)
for await (const log of tradingLogs.filter((l) =>
l.metadata.symbol === "BTCUSDT"
)) {
console.log("BTC Log:", log);
}
Services use persistence adapters through the dependency injection system:
Tests use mock adapters to avoid file system operations:
// test/config/setup.mjs
PersistSignalAdaper.usePersistSignalAdapter(class {
async waitForInit() { /* no-op */ }
async readValue() {
throw new Error("Should not be called in tests");
}
async hasValue() { return false; }
async writeValue() { /* no-op */ }
});
This pattern ensures tests run fast and don't leave artifacts on disk.
The Persistence Layer provides:
Key Classes:
PersistBase - Abstract base class with file-based default implementationIPersistBase - Interface for custom persistence backendswriteFileAtomic - Atomic file writing utilityPersistSignalAdaper - Adapter registry for signal persistencePersistRiskAdapter - Adapter registry for risk persistence