From 692756e2cf3b810d69a29003fd43e41c86f2d441 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Nordstr=C3=B8m?= Date: Sat, 22 Feb 2025 19:50:10 +0100 Subject: [PATCH] Add three bins using crate tokio-cron-scheduler. --- ...crate-tokio-cron-scheduler-postgres_job.rs | 49 +++ ...g-crate-tokio-cron-scheduler-simple_job.rs | 20 ++ ...-scheduler-simple_job_tokio_in_a_thread.rs | 38 +++ src/lib.rs | 1 + src/tcs_helpers.rs | 285 ++++++++++++++++++ 5 files changed, 393 insertions(+) create mode 100644 src/bin/using-crate-tokio-cron-scheduler-postgres_job.rs create mode 100644 src/bin/using-crate-tokio-cron-scheduler-simple_job.rs create mode 100644 src/bin/using-crate-tokio-cron-scheduler-simple_job_tokio_in_a_thread.rs create mode 100644 src/lib.rs create mode 100644 src/tcs_helpers.rs diff --git a/src/bin/using-crate-tokio-cron-scheduler-postgres_job.rs b/src/bin/using-crate-tokio-cron-scheduler-postgres_job.rs new file mode 100644 index 0000000..39fc877 --- /dev/null +++ b/src/bin/using-crate-tokio-cron-scheduler-postgres_job.rs @@ -0,0 +1,49 @@ +use jobs::tcs_helpers::{run_example, stop_example}; +use tokio_cron_scheduler::{ + JobScheduler, PostgresMetadataStore, PostgresNotificationStore, SimpleJobCode, + SimpleNotificationCode, +}; +use tracing::{info, Level}; +use tracing_subscriber::FmtSubscriber; + +#[tokio::main] +async fn main() { + let subscriber = FmtSubscriber::builder() + .with_max_level(Level::DEBUG) + .finish(); + tracing::subscriber::set_global_default(subscriber).expect("Setting default subscriber failed"); + + info!("Remember to have a running Postgres instance to connect to. For example:\n"); + info!("docker run --rm -it -p 5432:5432 -e POSTGRES_USER=\"postgres\" -e POSTGRES_PASSWORD=\"\" -e POSTGRES_HOST_AUTH_METHOD=\"trust\" postgres:14.1"); + + let metadata_storage = Box::new(PostgresMetadataStore::default()); + let notification_storage = Box::new(PostgresNotificationStore::default()); + if std::env::var("POSTGRES_INIT_METADATA").is_err() { + info!("Set to not initialize the job metadata tables. POSTGRES_INIT_METADATA=false"); + } + if std::env::var("POSTGRES_INIT_NOTIFICATIONS").is_err() { + info!( + "Set to not initialization of notification tables. POSTGRES_INIT_NOTIFICATIONS=false" + ); + } + + let simple_job_code = Box::new(SimpleJobCode::default()); + let simple_notification_code = Box::new(SimpleNotificationCode::default()); + + let mut sched = JobScheduler::new_with_storage_and_code( + metadata_storage, + notification_storage, + simple_job_code, + simple_notification_code, + 200, + ) + .await + .unwrap(); + + let jobs = run_example(&mut sched) + .await + .expect("Could not run example"); + stop_example(&mut sched, jobs) + .await + .expect("Could not stop example"); +} diff --git a/src/bin/using-crate-tokio-cron-scheduler-simple_job.rs b/src/bin/using-crate-tokio-cron-scheduler-simple_job.rs new file mode 100644 index 0000000..1eebf16 --- /dev/null +++ b/src/bin/using-crate-tokio-cron-scheduler-simple_job.rs @@ -0,0 +1,20 @@ +use jobs::tcs_helpers::{run_example, stop_example}; +use tokio_cron_scheduler::JobScheduler; +use tracing::Level; +use tracing_subscriber::FmtSubscriber; + +#[tokio::main] +async fn main() { + let subscriber = FmtSubscriber::builder() + .with_max_level(Level::TRACE) + .finish(); + tracing::subscriber::set_global_default(subscriber).expect("Setting default subscriber failed"); + let sched = JobScheduler::new_with_channel_size(1000).await; + let mut sched = sched.unwrap(); + let jobs = run_example(&mut sched) + .await + .expect("Could not run example"); + stop_example(&mut sched, jobs) + .await + .expect("Could not stop example"); +} diff --git a/src/bin/using-crate-tokio-cron-scheduler-simple_job_tokio_in_a_thread.rs b/src/bin/using-crate-tokio-cron-scheduler-simple_job_tokio_in_a_thread.rs new file mode 100644 index 0000000..6d82046 --- /dev/null +++ b/src/bin/using-crate-tokio-cron-scheduler-simple_job_tokio_in_a_thread.rs @@ -0,0 +1,38 @@ +use jobs::tcs_helpers::{run_example, stop_example}; +use std::error::Error; +use tokio_cron_scheduler::JobScheduler; +use tracing::{info, Level}; +use tracing_subscriber::FmtSubscriber; + +fn main() { + let handle = std::thread::Builder::new() + .name("schedule thread".to_string()) + .spawn(move || { + // tokio::runtime::Builder::new_current_thread() <- This hangs + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("build runtime failed") + .block_on(start()) + .expect("TODO: panic message"); + }) + .expect("spawn thread failed"); + handle.join().expect("join failed"); +} + +async fn start() -> Result<(), Box> { + let subscriber = FmtSubscriber::builder() + .with_max_level(Level::TRACE) + .finish(); + tracing::subscriber::set_global_default(subscriber).expect("Setting default subscriber failed"); + info!("Creating scheduler"); + let mut sched = JobScheduler::new().await?; + info!("Run example"); + let jobs = run_example(&mut sched) + .await + .expect("Could not run example"); + stop_example(&mut sched, jobs) + .await + .expect("Could not stop example"); + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..89180da --- /dev/null +++ b/src/lib.rs @@ -0,0 +1 @@ +pub mod tcs_helpers; diff --git a/src/tcs_helpers.rs b/src/tcs_helpers.rs new file mode 100644 index 0000000..3ca7463 --- /dev/null +++ b/src/tcs_helpers.rs @@ -0,0 +1,285 @@ +use chrono::Utc; +use std::time::Duration; +use tokio_cron_scheduler::{Job, JobBuilder, JobScheduler, JobSchedulerError}; +use tracing::{error, info, warn}; +use uuid::Uuid; + +pub async fn run_example(sched: &mut JobScheduler) -> Result, JobSchedulerError> { + #[cfg(feature = "signal")] + sched.shutdown_on_ctrl_c(); + + sched.set_shutdown_handler(Box::new(|| { + Box::pin(async move { + info!("Shut down done"); + }) + })); + + let mut five_s_job = Job::new("1/5 * * * * *", |uuid, _l| { + info!( + "{:?} I run every 5 seconds id {:?}", + chrono::Utc::now(), + uuid + ); + }) + .unwrap(); + + // Adding a job notification without it being added to the scheduler will automatically add it to + // the job store, but with stopped marking + five_s_job + .on_removed_notification_add( + &sched, + Box::new(|job_id, notification_id, type_of_notification| { + Box::pin(async move { + info!( + "5s Job {:?} was removed, notification {:?} ran ({:?})", + job_id, notification_id, type_of_notification + ); + }) + }), + ) + .await?; + let five_s_job_guid = five_s_job.guid(); + sched.add(five_s_job).await?; + + let mut four_s_job_async = Job::new_async_tz("1/4 * * * * *", Utc, |uuid, mut l| { + Box::pin(async move { + info!("I run async every 4 seconds id {:?}", uuid); + let next_tick = l.next_tick_for_job(uuid).await; + match next_tick { + Ok(Some(ts)) => info!("Next time for 4s is {:?}", ts), + _ => warn!("Could not get next tick for 4s job"), + } + }) + }) + .unwrap(); + let four_s_job_async_clone = four_s_job_async.clone(); + let js = sched.clone(); + info!("4s job id {:?}", four_s_job_async.guid()); + four_s_job_async.on_start_notification_add(&sched, Box::new(move |job_id, notification_id, type_of_notification| { + let four_s_job_async_clone = four_s_job_async_clone.clone(); + let js = js.clone(); + Box::pin(async move { + info!("4s Job {:?} ran on start notification {:?} ({:?})", job_id, notification_id, type_of_notification); + info!("This should only run once since we're going to remove this notification immediately."); + info!("Removed? {:?}", four_s_job_async_clone.on_start_notification_remove(&js, ¬ification_id).await); + }) + })).await?; + + four_s_job_async + .on_done_notification_add( + &sched, + Box::new(|job_id, notification_id, type_of_notification| { + Box::pin(async move { + info!( + "4s Job {:?} completed and ran notification {:?} ({:?})", + job_id, notification_id, type_of_notification + ); + }) + }), + ) + .await?; + + let four_s_job_guid = four_s_job_async.guid(); + sched.add(four_s_job_async).await?; + + sched + .add( + Job::new("1/30 * * * * *", |uuid, _l| { + info!("I run every 30 seconds id {:?}", uuid); + }) + .unwrap(), + ) + .await?; + + info!( + "Sched one shot for {:?}", + chrono::Utc::now() + .checked_add_signed(chrono::Duration::seconds(10)) + .unwrap() + ); + sched + .add( + Job::new_one_shot(Duration::from_secs(10), |_uuid, _l| { + info!("I'm only run once"); + }) + .unwrap(), + ) + .await?; + + info!( + "Sched one shot async for {:?}", + chrono::Utc::now() + .checked_add_signed(chrono::Duration::seconds(16)) + .unwrap() + ); + sched + .add( + Job::new_one_shot_async(Duration::from_secs(16), |_uuid, _l| { + Box::pin(async move { + info!("I'm only run once async"); + }) + }) + .unwrap(), + ) + .await?; + + let jj = Job::new_repeated(Duration::from_secs(8), |_uuid, _l| { + info!("I'm repeated every 8 seconds"); + }) + .unwrap(); + let jj_guid = jj.guid(); + sched.add(jj).await?; + + let jja = Job::new_repeated_async(Duration::from_secs(7), |_uuid, _l| { + Box::pin(async move { + info!("I'm repeated async every 7 seconds"); + }) + }) + .unwrap(); + let jja_guid = jja.guid(); + sched.add(jja).await?; + + let utc_job = JobBuilder::new() + .with_timezone(Utc) + .with_cron_job_type() + .with_schedule("*/2 * * * * *") + .unwrap() + .with_run_async(Box::new(|uuid, mut l| { + Box::pin(async move { + info!("UTC run async every 2 seconds id {:?}", uuid); + let next_tick = l.next_tick_for_job(uuid).await; + match next_tick { + Ok(Some(ts)) => info!("Next time for UTC 2s is {:?}", ts), + _ => warn!("Could not get next tick for 2s job"), + } + }) + })) + .build() + .unwrap(); + + let utc_job_guid = utc_job.guid(); + sched.add(utc_job).await.unwrap(); + + let jhb_job = JobBuilder::new() + .with_timezone(chrono_tz::Africa::Johannesburg) + .with_cron_job_type() + .with_schedule("*/2 * * * * *") + .unwrap() + .with_run_async(Box::new(|uuid, mut l| { + Box::pin(async move { + info!("JHB run async every 2 seconds id {:?}", uuid); + let next_tick = l.next_tick_for_job(uuid).await; + match next_tick { + Ok(Some(ts)) => info!("Next time for JHB 2s is {:?}", ts), + _ => warn!("Could not get next tick for 2s job"), + } + }) + })) + .build() + .unwrap(); + + let jhb_job_guid = jhb_job.guid(); + sched.add(jhb_job).await.unwrap(); + + #[cfg(feature = "english")] + let english_job_guid = { + let english_job = JobBuilder::new() + .with_timezone(Utc) + .with_cron_job_type() + // .with_schedule("every 10 seconds") + .with_schedule("every 10 seconds") + .unwrap() + .with_run_async(Box::new(|uuid, mut l| { + Box::pin(async move { + info!("English parsed job every 10 seconds id {:?}", uuid); + let next_tick = l.next_tick_for_job(uuid).await; + match next_tick { + Ok(Some(ts)) => info!("Next time for English parsed job is is {:?}", ts), + _ => warn!("Could not get next tick for English parsed job"), + } + }) + })) + .build() + .unwrap(); + + let english_job_guid = english_job.guid(); + sched.add(english_job).await.unwrap(); + english_job_guid + }; + + let start = sched.start().await; + if let Err(e) = start { + error!("Error starting scheduler {}", e); + return Err(e); + } + + let ret = vec![ + five_s_job_guid, + four_s_job_guid, + jj_guid, + jja_guid, + utc_job_guid, + jhb_job_guid, + #[cfg(feature = "english")] + english_job_guid, + ]; + Ok(ret) +} + +pub async fn stop_example( + sched: &mut JobScheduler, + jobs: Vec, +) -> Result<(), JobSchedulerError> { + tokio::time::sleep(Duration::from_secs(20)).await; + + for i in jobs { + sched.remove(&i).await?; + } + + tokio::time::sleep(Duration::from_secs(40)).await; + + info!("Goodbye."); + sched.shutdown().await?; + Ok(()) +} + +fn main() { + eprintln!("Should not be run on its own."); +} + +#[cfg(test)] +mod test { + use tokio_cron_scheduler::{Job, JobScheduler}; + use tracing::{info, Level}; + use tracing_subscriber::FmtSubscriber; + + // Needs multi_thread to test, otherwise it hangs on scheduler.add() + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + // #[tokio::test] + async fn test_schedule() { + let subscriber = FmtSubscriber::builder() + .with_max_level(Level::TRACE) + .finish(); + tracing::subscriber::set_global_default(subscriber) + .expect("Setting default subscriber failed"); + + info!("Create scheduler"); + let scheduler = JobScheduler::new().await.unwrap(); + info!("Add job"); + scheduler + .add( + Job::new_async("*/1 * * * * *", |_, _| { + Box::pin(async { + info!("Run every seconds"); + }) + }) + .unwrap(), + ) + .await + .expect("Should be able to add a job"); + + scheduler.start().await.unwrap(); + + tokio::time::sleep(core::time::Duration::from_secs(20)).await; + } +}