280 lines
8.7 KiB
Rust
280 lines
8.7 KiB
Rust
//! Code from <https://github.com/mvniekerk/tokio-cron-scheduler/blob/6c568541022317cc07905ffd25305f0e6e2cfc74/examples/lib.rs>
|
|
|
|
use chrono::Utc;
|
|
use std::time::Duration;
|
|
use tokio_cron_scheduler::{Job, JobBuilder, JobScheduler, JobSchedulerError};
|
|
use tracing::{error, info, warn};
|
|
use uuid::Uuid;
|
|
|
|
pub async fn run_example(sched: &mut JobScheduler) -> Result<Vec<Uuid>, JobSchedulerError> {
|
|
sched.shutdown_on_ctrl_c();
|
|
|
|
sched.set_shutdown_handler(Box::new(|| {
|
|
Box::pin(async move {
|
|
info!("Shut down done");
|
|
})
|
|
}));
|
|
|
|
let mut five_s_job = Job::new("1/5 * * * * *", |uuid, _l| {
|
|
info!(
|
|
"{:?} I run every 5 seconds id {:?}",
|
|
chrono::Utc::now(),
|
|
uuid
|
|
);
|
|
})
|
|
.unwrap();
|
|
|
|
// Adding a job notification without it being added to the scheduler will automatically add it to
|
|
// the job store, but with stopped marking
|
|
five_s_job
|
|
.on_removed_notification_add(
|
|
&sched,
|
|
Box::new(|job_id, notification_id, type_of_notification| {
|
|
Box::pin(async move {
|
|
info!(
|
|
"5s Job {:?} was removed, notification {:?} ran ({:?})",
|
|
job_id, notification_id, type_of_notification
|
|
);
|
|
})
|
|
}),
|
|
)
|
|
.await?;
|
|
let five_s_job_guid = five_s_job.guid();
|
|
sched.add(five_s_job).await?;
|
|
|
|
let mut four_s_job_async = Job::new_async_tz("1/4 * * * * *", Utc, |uuid, mut l| {
|
|
Box::pin(async move {
|
|
info!("I run async every 4 seconds id {:?}", uuid);
|
|
let next_tick = l.next_tick_for_job(uuid).await;
|
|
match next_tick {
|
|
Ok(Some(ts)) => info!("Next time for 4s is {:?}", ts),
|
|
_ => warn!("Could not get next tick for 4s job"),
|
|
}
|
|
})
|
|
})
|
|
.unwrap();
|
|
let four_s_job_async_clone = four_s_job_async.clone();
|
|
let js = sched.clone();
|
|
info!("4s job id {:?}", four_s_job_async.guid());
|
|
four_s_job_async.on_start_notification_add(&sched, Box::new(move |job_id, notification_id, type_of_notification| {
|
|
let four_s_job_async_clone = four_s_job_async_clone.clone();
|
|
let js = js.clone();
|
|
Box::pin(async move {
|
|
info!("4s Job {:?} ran on start notification {:?} ({:?})", job_id, notification_id, type_of_notification);
|
|
info!("This should only run once since we're going to remove this notification immediately.");
|
|
info!("Removed? {:?}", four_s_job_async_clone.on_start_notification_remove(&js, ¬ification_id).await);
|
|
})
|
|
})).await?;
|
|
|
|
four_s_job_async
|
|
.on_done_notification_add(
|
|
&sched,
|
|
Box::new(|job_id, notification_id, type_of_notification| {
|
|
Box::pin(async move {
|
|
info!(
|
|
"4s Job {:?} completed and ran notification {:?} ({:?})",
|
|
job_id, notification_id, type_of_notification
|
|
);
|
|
})
|
|
}),
|
|
)
|
|
.await?;
|
|
|
|
let four_s_job_guid = four_s_job_async.guid();
|
|
sched.add(four_s_job_async).await?;
|
|
|
|
sched
|
|
.add(
|
|
Job::new("1/30 * * * * *", |uuid, _l| {
|
|
info!("I run every 30 seconds id {:?}", uuid);
|
|
})
|
|
.unwrap(),
|
|
)
|
|
.await?;
|
|
|
|
info!(
|
|
"Sched one shot for {:?}",
|
|
chrono::Utc::now()
|
|
.checked_add_signed(chrono::Duration::seconds(10))
|
|
.unwrap()
|
|
);
|
|
sched
|
|
.add(
|
|
Job::new_one_shot(Duration::from_secs(10), |_uuid, _l| {
|
|
info!("I'm only run once");
|
|
})
|
|
.unwrap(),
|
|
)
|
|
.await?;
|
|
|
|
info!(
|
|
"Sched one shot async for {:?}",
|
|
chrono::Utc::now()
|
|
.checked_add_signed(chrono::Duration::seconds(16))
|
|
.unwrap()
|
|
);
|
|
sched
|
|
.add(
|
|
Job::new_one_shot_async(Duration::from_secs(16), |_uuid, _l| {
|
|
Box::pin(async move {
|
|
info!("I'm only run once async");
|
|
})
|
|
})
|
|
.unwrap(),
|
|
)
|
|
.await?;
|
|
|
|
let jj = Job::new_repeated(Duration::from_secs(8), |_uuid, _l| {
|
|
info!("I'm repeated every 8 seconds");
|
|
})
|
|
.unwrap();
|
|
let jj_guid = jj.guid();
|
|
sched.add(jj).await?;
|
|
|
|
let jja = Job::new_repeated_async(Duration::from_secs(7), |_uuid, _l| {
|
|
Box::pin(async move {
|
|
info!("I'm repeated async every 7 seconds");
|
|
})
|
|
})
|
|
.unwrap();
|
|
let jja_guid = jja.guid();
|
|
sched.add(jja).await?;
|
|
|
|
let utc_job = JobBuilder::new()
|
|
.with_timezone(Utc)
|
|
.with_cron_job_type()
|
|
.with_schedule("*/2 * * * * *")
|
|
.unwrap()
|
|
.with_run_async(Box::new(|uuid, mut l| {
|
|
Box::pin(async move {
|
|
info!("UTC run async every 2 seconds id {:?}", uuid);
|
|
let next_tick = l.next_tick_for_job(uuid).await;
|
|
match next_tick {
|
|
Ok(Some(ts)) => info!("Next time for UTC 2s is {:?}", ts),
|
|
_ => warn!("Could not get next tick for 2s job"),
|
|
}
|
|
})
|
|
}))
|
|
.build()
|
|
.unwrap();
|
|
|
|
let utc_job_guid = utc_job.guid();
|
|
sched.add(utc_job).await.unwrap();
|
|
|
|
let jhb_job = JobBuilder::new()
|
|
.with_timezone(chrono_tz::Africa::Johannesburg)
|
|
.with_cron_job_type()
|
|
.with_schedule("*/2 * * * * *")
|
|
.unwrap()
|
|
.with_run_async(Box::new(|uuid, mut l| {
|
|
Box::pin(async move {
|
|
info!("JHB run async every 2 seconds id {:?}", uuid);
|
|
let next_tick = l.next_tick_for_job(uuid).await;
|
|
match next_tick {
|
|
Ok(Some(ts)) => info!("Next time for JHB 2s is {:?}", ts),
|
|
_ => warn!("Could not get next tick for 2s job"),
|
|
}
|
|
})
|
|
}))
|
|
.build()
|
|
.unwrap();
|
|
|
|
let jhb_job_guid = jhb_job.guid();
|
|
sched.add(jhb_job).await.unwrap();
|
|
|
|
let english_job_guid = {
|
|
let english_job = JobBuilder::new()
|
|
.with_timezone(Utc)
|
|
.with_cron_job_type()
|
|
// .with_schedule("every 10 seconds")
|
|
.with_schedule("every 10 seconds")
|
|
.unwrap()
|
|
.with_run_async(Box::new(|uuid, mut l| {
|
|
Box::pin(async move {
|
|
info!("English parsed job every 10 seconds id {:?}", uuid);
|
|
let next_tick = l.next_tick_for_job(uuid).await;
|
|
match next_tick {
|
|
Ok(Some(ts)) => info!("Next time for English parsed job is is {:?}", ts),
|
|
_ => warn!("Could not get next tick for English parsed job"),
|
|
}
|
|
})
|
|
}))
|
|
.build()
|
|
.unwrap();
|
|
|
|
let english_job_guid = english_job.guid();
|
|
sched.add(english_job).await.unwrap();
|
|
english_job_guid
|
|
};
|
|
|
|
let start = sched.start().await;
|
|
if let Err(e) = start {
|
|
error!("Error starting scheduler {}", e);
|
|
return Err(e);
|
|
}
|
|
|
|
let ret = vec![
|
|
five_s_job_guid,
|
|
four_s_job_guid,
|
|
jj_guid,
|
|
jja_guid,
|
|
utc_job_guid,
|
|
jhb_job_guid,
|
|
english_job_guid,
|
|
];
|
|
Ok(ret)
|
|
}
|
|
|
|
pub async fn stop_example(
|
|
sched: &mut JobScheduler,
|
|
jobs: Vec<Uuid>,
|
|
) -> Result<(), JobSchedulerError> {
|
|
tokio::time::sleep(Duration::from_secs(20)).await;
|
|
|
|
for i in jobs {
|
|
sched.remove(&i).await?;
|
|
}
|
|
|
|
tokio::time::sleep(Duration::from_secs(40)).await;
|
|
|
|
info!("Goodbye.");
|
|
sched.shutdown().await?;
|
|
Ok(())
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod test {
|
|
use tokio_cron_scheduler::{Job, JobScheduler};
|
|
use tracing::{info, Level};
|
|
use tracing_subscriber::FmtSubscriber;
|
|
|
|
// Needs multi_thread to test, otherwise it hangs on scheduler.add()
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
// #[tokio::test]
|
|
async fn test_schedule() {
|
|
let subscriber = FmtSubscriber::builder()
|
|
.with_max_level(Level::TRACE)
|
|
.finish();
|
|
tracing::subscriber::set_global_default(subscriber)
|
|
.expect("Setting default subscriber failed");
|
|
|
|
info!("Create scheduler");
|
|
let scheduler = JobScheduler::new().await.unwrap();
|
|
info!("Add job");
|
|
scheduler
|
|
.add(
|
|
Job::new_async("*/1 * * * * *", |_, _| {
|
|
Box::pin(async {
|
|
info!("Run every seconds");
|
|
})
|
|
})
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.expect("Should be able to add a job");
|
|
|
|
scheduler.start().await.unwrap();
|
|
|
|
tokio::time::sleep(core::time::Duration::from_secs(20)).await;
|
|
}
|
|
}
|