Add three bins using crate tokio-cron-scheduler.

This commit is contained in:
Erik Nordstrøm 2025-02-22 19:50:10 +01:00
parent c76a20e37c
commit 692756e2cf
5 changed files with 393 additions and 0 deletions

View file

@ -0,0 +1,49 @@
use jobs::tcs_helpers::{run_example, stop_example};
use tokio_cron_scheduler::{
JobScheduler, PostgresMetadataStore, PostgresNotificationStore, SimpleJobCode,
SimpleNotificationCode,
};
use tracing::{info, Level};
use tracing_subscriber::FmtSubscriber;
#[tokio::main]
async fn main() {
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::DEBUG)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("Setting default subscriber failed");
info!("Remember to have a running Postgres instance to connect to. For example:\n");
info!("docker run --rm -it -p 5432:5432 -e POSTGRES_USER=\"postgres\" -e POSTGRES_PASSWORD=\"\" -e POSTGRES_HOST_AUTH_METHOD=\"trust\" postgres:14.1");
let metadata_storage = Box::new(PostgresMetadataStore::default());
let notification_storage = Box::new(PostgresNotificationStore::default());
if std::env::var("POSTGRES_INIT_METADATA").is_err() {
info!("Set to not initialize the job metadata tables. POSTGRES_INIT_METADATA=false");
}
if std::env::var("POSTGRES_INIT_NOTIFICATIONS").is_err() {
info!(
"Set to not initialization of notification tables. POSTGRES_INIT_NOTIFICATIONS=false"
);
}
let simple_job_code = Box::new(SimpleJobCode::default());
let simple_notification_code = Box::new(SimpleNotificationCode::default());
let mut sched = JobScheduler::new_with_storage_and_code(
metadata_storage,
notification_storage,
simple_job_code,
simple_notification_code,
200,
)
.await
.unwrap();
let jobs = run_example(&mut sched)
.await
.expect("Could not run example");
stop_example(&mut sched, jobs)
.await
.expect("Could not stop example");
}

View file

@ -0,0 +1,20 @@
use jobs::tcs_helpers::{run_example, stop_example};
use tokio_cron_scheduler::JobScheduler;
use tracing::Level;
use tracing_subscriber::FmtSubscriber;
#[tokio::main]
async fn main() {
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::TRACE)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("Setting default subscriber failed");
let sched = JobScheduler::new_with_channel_size(1000).await;
let mut sched = sched.unwrap();
let jobs = run_example(&mut sched)
.await
.expect("Could not run example");
stop_example(&mut sched, jobs)
.await
.expect("Could not stop example");
}

View file

@ -0,0 +1,38 @@
use jobs::tcs_helpers::{run_example, stop_example};
use std::error::Error;
use tokio_cron_scheduler::JobScheduler;
use tracing::{info, Level};
use tracing_subscriber::FmtSubscriber;
fn main() {
let handle = std::thread::Builder::new()
.name("schedule thread".to_string())
.spawn(move || {
// tokio::runtime::Builder::new_current_thread() <- This hangs
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("build runtime failed")
.block_on(start())
.expect("TODO: panic message");
})
.expect("spawn thread failed");
handle.join().expect("join failed");
}
async fn start() -> Result<(), Box<dyn Error>> {
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::TRACE)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("Setting default subscriber failed");
info!("Creating scheduler");
let mut sched = JobScheduler::new().await?;
info!("Run example");
let jobs = run_example(&mut sched)
.await
.expect("Could not run example");
stop_example(&mut sched, jobs)
.await
.expect("Could not stop example");
Ok(())
}

1
src/lib.rs Normal file
View file

@ -0,0 +1 @@
pub mod tcs_helpers;

285
src/tcs_helpers.rs Normal file
View file

@ -0,0 +1,285 @@
use chrono::Utc;
use std::time::Duration;
use tokio_cron_scheduler::{Job, JobBuilder, JobScheduler, JobSchedulerError};
use tracing::{error, info, warn};
use uuid::Uuid;
pub async fn run_example(sched: &mut JobScheduler) -> Result<Vec<Uuid>, JobSchedulerError> {
#[cfg(feature = "signal")]
sched.shutdown_on_ctrl_c();
sched.set_shutdown_handler(Box::new(|| {
Box::pin(async move {
info!("Shut down done");
})
}));
let mut five_s_job = Job::new("1/5 * * * * *", |uuid, _l| {
info!(
"{:?} I run every 5 seconds id {:?}",
chrono::Utc::now(),
uuid
);
})
.unwrap();
// Adding a job notification without it being added to the scheduler will automatically add it to
// the job store, but with stopped marking
five_s_job
.on_removed_notification_add(
&sched,
Box::new(|job_id, notification_id, type_of_notification| {
Box::pin(async move {
info!(
"5s Job {:?} was removed, notification {:?} ran ({:?})",
job_id, notification_id, type_of_notification
);
})
}),
)
.await?;
let five_s_job_guid = five_s_job.guid();
sched.add(five_s_job).await?;
let mut four_s_job_async = Job::new_async_tz("1/4 * * * * *", Utc, |uuid, mut l| {
Box::pin(async move {
info!("I run async every 4 seconds id {:?}", uuid);
let next_tick = l.next_tick_for_job(uuid).await;
match next_tick {
Ok(Some(ts)) => info!("Next time for 4s is {:?}", ts),
_ => warn!("Could not get next tick for 4s job"),
}
})
})
.unwrap();
let four_s_job_async_clone = four_s_job_async.clone();
let js = sched.clone();
info!("4s job id {:?}", four_s_job_async.guid());
four_s_job_async.on_start_notification_add(&sched, Box::new(move |job_id, notification_id, type_of_notification| {
let four_s_job_async_clone = four_s_job_async_clone.clone();
let js = js.clone();
Box::pin(async move {
info!("4s Job {:?} ran on start notification {:?} ({:?})", job_id, notification_id, type_of_notification);
info!("This should only run once since we're going to remove this notification immediately.");
info!("Removed? {:?}", four_s_job_async_clone.on_start_notification_remove(&js, &notification_id).await);
})
})).await?;
four_s_job_async
.on_done_notification_add(
&sched,
Box::new(|job_id, notification_id, type_of_notification| {
Box::pin(async move {
info!(
"4s Job {:?} completed and ran notification {:?} ({:?})",
job_id, notification_id, type_of_notification
);
})
}),
)
.await?;
let four_s_job_guid = four_s_job_async.guid();
sched.add(four_s_job_async).await?;
sched
.add(
Job::new("1/30 * * * * *", |uuid, _l| {
info!("I run every 30 seconds id {:?}", uuid);
})
.unwrap(),
)
.await?;
info!(
"Sched one shot for {:?}",
chrono::Utc::now()
.checked_add_signed(chrono::Duration::seconds(10))
.unwrap()
);
sched
.add(
Job::new_one_shot(Duration::from_secs(10), |_uuid, _l| {
info!("I'm only run once");
})
.unwrap(),
)
.await?;
info!(
"Sched one shot async for {:?}",
chrono::Utc::now()
.checked_add_signed(chrono::Duration::seconds(16))
.unwrap()
);
sched
.add(
Job::new_one_shot_async(Duration::from_secs(16), |_uuid, _l| {
Box::pin(async move {
info!("I'm only run once async");
})
})
.unwrap(),
)
.await?;
let jj = Job::new_repeated(Duration::from_secs(8), |_uuid, _l| {
info!("I'm repeated every 8 seconds");
})
.unwrap();
let jj_guid = jj.guid();
sched.add(jj).await?;
let jja = Job::new_repeated_async(Duration::from_secs(7), |_uuid, _l| {
Box::pin(async move {
info!("I'm repeated async every 7 seconds");
})
})
.unwrap();
let jja_guid = jja.guid();
sched.add(jja).await?;
let utc_job = JobBuilder::new()
.with_timezone(Utc)
.with_cron_job_type()
.with_schedule("*/2 * * * * *")
.unwrap()
.with_run_async(Box::new(|uuid, mut l| {
Box::pin(async move {
info!("UTC run async every 2 seconds id {:?}", uuid);
let next_tick = l.next_tick_for_job(uuid).await;
match next_tick {
Ok(Some(ts)) => info!("Next time for UTC 2s is {:?}", ts),
_ => warn!("Could not get next tick for 2s job"),
}
})
}))
.build()
.unwrap();
let utc_job_guid = utc_job.guid();
sched.add(utc_job).await.unwrap();
let jhb_job = JobBuilder::new()
.with_timezone(chrono_tz::Africa::Johannesburg)
.with_cron_job_type()
.with_schedule("*/2 * * * * *")
.unwrap()
.with_run_async(Box::new(|uuid, mut l| {
Box::pin(async move {
info!("JHB run async every 2 seconds id {:?}", uuid);
let next_tick = l.next_tick_for_job(uuid).await;
match next_tick {
Ok(Some(ts)) => info!("Next time for JHB 2s is {:?}", ts),
_ => warn!("Could not get next tick for 2s job"),
}
})
}))
.build()
.unwrap();
let jhb_job_guid = jhb_job.guid();
sched.add(jhb_job).await.unwrap();
#[cfg(feature = "english")]
let english_job_guid = {
let english_job = JobBuilder::new()
.with_timezone(Utc)
.with_cron_job_type()
// .with_schedule("every 10 seconds")
.with_schedule("every 10 seconds")
.unwrap()
.with_run_async(Box::new(|uuid, mut l| {
Box::pin(async move {
info!("English parsed job every 10 seconds id {:?}", uuid);
let next_tick = l.next_tick_for_job(uuid).await;
match next_tick {
Ok(Some(ts)) => info!("Next time for English parsed job is is {:?}", ts),
_ => warn!("Could not get next tick for English parsed job"),
}
})
}))
.build()
.unwrap();
let english_job_guid = english_job.guid();
sched.add(english_job).await.unwrap();
english_job_guid
};
let start = sched.start().await;
if let Err(e) = start {
error!("Error starting scheduler {}", e);
return Err(e);
}
let ret = vec![
five_s_job_guid,
four_s_job_guid,
jj_guid,
jja_guid,
utc_job_guid,
jhb_job_guid,
#[cfg(feature = "english")]
english_job_guid,
];
Ok(ret)
}
pub async fn stop_example(
sched: &mut JobScheduler,
jobs: Vec<Uuid>,
) -> Result<(), JobSchedulerError> {
tokio::time::sleep(Duration::from_secs(20)).await;
for i in jobs {
sched.remove(&i).await?;
}
tokio::time::sleep(Duration::from_secs(40)).await;
info!("Goodbye.");
sched.shutdown().await?;
Ok(())
}
fn main() {
eprintln!("Should not be run on its own.");
}
#[cfg(test)]
mod test {
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{info, Level};
use tracing_subscriber::FmtSubscriber;
// Needs multi_thread to test, otherwise it hangs on scheduler.add()
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// #[tokio::test]
async fn test_schedule() {
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::TRACE)
.finish();
tracing::subscriber::set_global_default(subscriber)
.expect("Setting default subscriber failed");
info!("Create scheduler");
let scheduler = JobScheduler::new().await.unwrap();
info!("Add job");
scheduler
.add(
Job::new_async("*/1 * * * * *", |_, _| {
Box::pin(async {
info!("Run every seconds");
})
})
.unwrap(),
)
.await
.expect("Should be able to add a job");
scheduler.start().await.unwrap();
tokio::time::sleep(core::time::Duration::from_secs(20)).await;
}
}