removed old tests, created new ones and format
This commit is contained in:
parent
7579afa3c3
commit
b03231b849
57 changed files with 4092 additions and 5901 deletions
|
|
@ -1,10 +1,7 @@
|
|||
import { getRandomUserAgent } from '@stock-bot/utils';
|
||||
import type { CeoHandler } from '../ceo.handler';
|
||||
|
||||
export async function getChannels(
|
||||
this: CeoHandler,
|
||||
payload: number | undefined
|
||||
): Promise<unknown> {
|
||||
export async function getChannels(this: CeoHandler, payload: number | undefined): Promise<unknown> {
|
||||
const proxy = this.proxy?.getProxy();
|
||||
if (!proxy) {
|
||||
this.logger.warn('No proxy available for CEO channels update');
|
||||
|
|
@ -20,7 +17,8 @@ export async function getChannels(
|
|||
try {
|
||||
this.logger.info(`Fetching CEO channels for page ${page} with proxy ${proxy}`);
|
||||
const response = await this.http.get(
|
||||
'https://api.ceo.ca/api/home?exchange=all&sort_by=symbol§or=All&tab=companies&page=' + page,
|
||||
'https://api.ceo.ca/api/home?exchange=all&sort_by=symbol§or=All&tab=companies&page=' +
|
||||
page,
|
||||
{
|
||||
proxy: proxy,
|
||||
headers: {
|
||||
|
|
@ -77,7 +75,7 @@ export async function getChannels(
|
|||
|
||||
this.logger.info(`Fetched CEO channels for page ${page}/${totalPages}`);
|
||||
return { page, totalPages };
|
||||
}catch (error) {
|
||||
} catch (error) {
|
||||
this.logger.error(`Error fetching CEO channels for page ${page} with proxy ${proxy}:`, error);
|
||||
throw new Error(`Failed to fetch CEO channels: ${error.message}`);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,11 +1,7 @@
|
|||
import { getRandomUserAgent } from '@stock-bot/utils';
|
||||
import type { CeoHandler } from '../ceo.handler';
|
||||
|
||||
export async function getPosts(
|
||||
this: CeoHandler,
|
||||
payload: any,
|
||||
_context: any
|
||||
): Promise<unknown> {
|
||||
export async function getPosts(this: CeoHandler, payload: any, _context: any): Promise<unknown> {
|
||||
const { ceoId, symbol, timestamp, finished } = payload;
|
||||
const proxy = this.proxy?.getProxy();
|
||||
if (!proxy) {
|
||||
|
|
@ -40,21 +36,23 @@ export async function getPosts(
|
|||
const spielCount = data.spiels.length;
|
||||
if (spielCount === 0) {
|
||||
this.logger.warn(`No spiels found for ceoId ${ceoId}`);
|
||||
|
||||
|
||||
if (!timestamp) {
|
||||
// First call returned no posts - mark as finished with current time
|
||||
await this.mongodb.updateMany(
|
||||
'ceoSymbols',
|
||||
{ ceoId },
|
||||
{ $set: {
|
||||
finished: true,
|
||||
newestPostTimestamp: Date.now()
|
||||
}}
|
||||
{
|
||||
$set: {
|
||||
finished: true,
|
||||
newestPostTimestamp: Date.now(),
|
||||
},
|
||||
}
|
||||
);
|
||||
}
|
||||
return null; // No data to process
|
||||
}
|
||||
|
||||
|
||||
// Extract timestamps clearly
|
||||
const oldestPostInBatch = data.spiels[0]?.timestamp;
|
||||
const newestPostInBatch = data.spiels[data.spiels.length - 1]?.timestamp;
|
||||
|
|
@ -90,13 +88,13 @@ export async function getPosts(
|
|||
poll: spiel.poll,
|
||||
votedInPoll: spiel.voted_in_poll,
|
||||
}));
|
||||
|
||||
|
||||
// Handle based on collection state
|
||||
if (!finished) {
|
||||
// INITIAL COLLECTION MODE
|
||||
// Insert all posts - no duplicate checking needed on initial collection
|
||||
await this.mongodb.batchUpsert('ceoPosts', posts, ['spielId']);
|
||||
|
||||
|
||||
// Update newest timestamp on first call only
|
||||
if (!timestamp) {
|
||||
await this.mongodb.updateMany(
|
||||
|
|
@ -105,9 +103,9 @@ export async function getPosts(
|
|||
{ $set: { newestPostTimestamp: newestPostInBatch } }
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
this.logger.info(`Initial collection: fetched ${spielCount} spiels for ${ceoId}`);
|
||||
|
||||
|
||||
// Continue paginating backwards through history
|
||||
await this.scheduleOperation(
|
||||
'get-posts',
|
||||
|
|
@ -119,33 +117,33 @@ export async function getPosts(
|
|||
},
|
||||
{ priority: 0 }
|
||||
);
|
||||
|
||||
} else {
|
||||
// UPDATE COLLECTION MODE (finished=true)
|
||||
// Get the last known newest post
|
||||
const symbolData = await this.mongodb.findOne('ceoSymbols', { ceoId });
|
||||
const lastKnownNewestTimestamp = symbolData?.newestPostTimestamp || symbolData?.lastSpielTime || 0;
|
||||
|
||||
const lastKnownNewestTimestamp =
|
||||
symbolData?.newestPostTimestamp || symbolData?.lastSpielTime || 0;
|
||||
|
||||
// Filter to only posts newer than what we've seen
|
||||
const newPosts = posts.filter(p => p.timestamp > lastKnownNewestTimestamp);
|
||||
|
||||
|
||||
if (newPosts.length === 0) {
|
||||
this.logger.info(`No new posts for ${ceoId}, all ${posts.length} posts already collected`);
|
||||
return { ceoId, spielCount: 0, timestamp };
|
||||
}
|
||||
|
||||
|
||||
// Insert new posts
|
||||
await this.mongodb.batchUpsert('ceoPosts', newPosts, ['spielId']);
|
||||
|
||||
|
||||
// Update to the newest timestamp we've now seen
|
||||
await this.mongodb.updateMany(
|
||||
'ceoSymbols',
|
||||
{ ceoId },
|
||||
{ $set: { newestPostTimestamp: newestPostInBatch } }
|
||||
);
|
||||
|
||||
|
||||
this.logger.info(`Update collection: collected ${newPosts.length} new posts for ${ceoId}`);
|
||||
|
||||
|
||||
// If all posts in batch were new, there might be more in the gap
|
||||
// Continue paginating until we hit posts we've already seen
|
||||
if (newPosts.length === posts.length && oldestPostInBatch > lastKnownNewestTimestamp) {
|
||||
|
|
@ -165,17 +163,12 @@ export async function getPosts(
|
|||
|
||||
// If timestamp is not provided, run the short positions update
|
||||
if (!timestamp) {
|
||||
await this.scheduleOperation(
|
||||
'get-shorts',
|
||||
{
|
||||
symbol: symbol,
|
||||
},
|
||||
);
|
||||
await this.scheduleOperation('get-shorts', {
|
||||
symbol: symbol,
|
||||
});
|
||||
}
|
||||
|
||||
this.logger.info(
|
||||
`Successfully processed channel ${ceoId}`
|
||||
);
|
||||
this.logger.info(`Successfully processed channel ${ceoId}`);
|
||||
|
||||
return { ceoId, spielCount, timestamp };
|
||||
} catch (error) {
|
||||
|
|
|
|||
|
|
@ -1,11 +1,7 @@
|
|||
import { getRandomUserAgent } from '@stock-bot/utils';
|
||||
import type { CeoHandler } from '../ceo.handler';
|
||||
|
||||
export async function getShorts(
|
||||
this: CeoHandler,
|
||||
payload: any,
|
||||
_context: any
|
||||
): Promise<unknown> {
|
||||
export async function getShorts(this: CeoHandler, payload: any, _context: any): Promise<unknown> {
|
||||
const { ceoId, symbol, timestamp } = payload;
|
||||
const proxy = this.proxy?.getProxy();
|
||||
if (!proxy) {
|
||||
|
|
|
|||
|
|
@ -2,4 +2,3 @@ export { getChannels } from './get-channels.action';
|
|||
export { getPosts } from './get-posts.action';
|
||||
export { getShorts } from './get-shorts.action';
|
||||
export { updateUniqueSymbols } from './update-unique-symbols.action';
|
||||
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ export async function updateUniqueSymbols(
|
|||
let scheduledJobs = 0;
|
||||
for (const symbol of uniqueSymbols) {
|
||||
// Schedule a job to process this individual symbol
|
||||
if(action === 'get-posts') {
|
||||
if (action === 'get-posts') {
|
||||
await this.scheduleOperation(
|
||||
'get-posts',
|
||||
{
|
||||
|
|
@ -42,7 +42,7 @@ export async function updateUniqueSymbols(
|
|||
},
|
||||
{ priority: 10 }
|
||||
);
|
||||
} else if(action === 'get-shorts') {
|
||||
} else if (action === 'get-shorts') {
|
||||
await this.scheduleOperation(
|
||||
'get-shorts',
|
||||
{
|
||||
|
|
@ -54,7 +54,6 @@ export async function updateUniqueSymbols(
|
|||
);
|
||||
}
|
||||
|
||||
|
||||
scheduledJobs++;
|
||||
|
||||
// Add small delay to avoid overwhelming the queue
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ export class CeoHandler extends BaseHandler {
|
|||
batch: {
|
||||
size: 100,
|
||||
delayInHours: 0.5,
|
||||
}
|
||||
},
|
||||
})
|
||||
updateUniqueSymbolsPosts = updateUniqueSymbols;
|
||||
|
||||
|
|
@ -42,7 +42,7 @@ export class CeoHandler extends BaseHandler {
|
|||
size: 50,
|
||||
delayInHours: 2,
|
||||
direct: true, // Use direct mode for shorts
|
||||
}
|
||||
},
|
||||
})
|
||||
updateUniqueSymbolsShorts = updateUniqueSymbols;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,10 +1,10 @@
|
|||
/**
|
||||
* Market data routes
|
||||
*/
|
||||
import { Hono } from 'hono';
|
||||
import type { IServiceContainer } from '@stock-bot/handlers';
|
||||
import { getLogger } from '@stock-bot/logger';
|
||||
import { processItems } from '@stock-bot/queue';
|
||||
import { Hono } from 'hono';
|
||||
|
||||
const logger = getLogger('market-data-routes');
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue