diff --git a/src/apalis_email_service.rs b/src/apalis_email_service.rs new file mode 100644 index 0000000..252fee6 --- /dev/null +++ b/src/apalis_email_service.rs @@ -0,0 +1,114 @@ +use std::{str::FromStr, sync::Arc}; + +use apalis::prelude::*; +use email_address::EmailAddress; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct Email { + pub to: String, + pub subject: String, + pub text: String, +} + +pub async fn send_email(job: Email) -> Result<(), Error> { + let validation = EmailAddress::from_str(&job.to); + match validation { + Ok(email) => { + log::info!("Attempting to send email to {}", email.as_str()); + Ok(()) + } + Err(email_address::Error::InvalidCharacter) => { + log::error!("Killed send email job. Invalid character {}", job.to); + Err(Error::Abort(Arc::new(Box::new( + email_address::Error::InvalidCharacter, + )))) + } + Err(e) => Err(Error::Failed(Arc::new(Box::new(e)))), + } +} + +pub fn example_good_email() -> Email { + Email { + subject: "Test Subject".to_string(), + to: "example@gmail.com".to_string(), + text: "Some Text".to_string(), + } +} + +pub fn example_killed_email() -> Email { + Email { + subject: "Test Subject".to_string(), + to: "example@©.com".to_string(), // killed because it has © which is invalid + text: "Some Text".to_string(), + } +} + +pub fn example_retry_able_email() -> Email { + Email { + subject: "Test Subject".to_string(), + to: "example".to_string(), + text: "Some Text".to_string(), + } +} + +pub const FORM_HTML: &str = r#" + + + + + + + +
+
+
+ + +

Please fill out this field.

+
+ +
+
+
+ + +

Some tips - as long as needed

+
+
+
+
+ + +
+
+
+
+ +
+
+
+
+ + + "#; + +#[derive(Debug)] +pub enum EmailError { + NoStorage, + SomeError(&'static str), +} + +impl std::fmt::Display for EmailError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{self:?}") + } +} diff --git a/src/bin/using-crate-apalis-postgres.rs b/src/bin/using-crate-apalis-postgres.rs new file mode 100644 index 0000000..c597767 --- /dev/null +++ b/src/bin/using-crate-apalis-postgres.rs @@ -0,0 +1,64 @@ +use anyhow::Result; +use apalis::layers::retry::RetryPolicy; +use apalis::prelude::*; +use apalis_sql::{ + postgres::{PgListen, PgPool, PostgresStorage}, + Config, +}; +use jobs::apalis_email_service::{send_email, Email}; +use tracing::{debug, info}; + +async fn produce_jobs(storage: &mut PostgresStorage) -> Result<()> { + for index in 0..10 { + storage + .push(Email { + to: format!("test{}@example.com", index), + text: "Test background job from apalis".to_string(), + subject: "Background email job".to_string(), + }) + .await?; + } + // The sql way + tracing::info!("You can also add jobs via sql query, run this: \n Select apalis.push_job('apalis::Email', json_build_object('subject', 'Test apalis', 'to', 'test1@example.com', 'text', 'Lorem Ipsum'));"); + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<()> { + std::env::set_var("RUST_LOG", "debug,sqlx::query=error"); + tracing_subscriber::fmt::init(); + let database_url = std::env::var("DATABASE_URL").expect("Must specify path to db"); + + let pool = PgPool::connect(&database_url).await?; + PostgresStorage::setup(&pool) + .await + .expect("unable to run migrations for postgres"); + + let mut pg = PostgresStorage::new_with_config(pool.clone(), Config::new("apalis::Email")); + produce_jobs(&mut pg).await?; + + let mut listener = PgListen::new(pool).await?; + + listener.subscribe_with(&mut pg); + + tokio::spawn(async move { + listener.listen().await.unwrap(); + }); + + Monitor::new() + .register({ + WorkerBuilder::new("tasty-orange") + .retry(RetryPolicy::retries(5)) + .enable_tracing() + .backend(pg) + .build_fn(send_email) + }) + .on_event(|e| debug!("{e}")) + .run_with_signal(async { + tokio::signal::ctrl_c().await?; + info!("Shutting down the system"); + Ok(()) + }) + .await?; + Ok(()) +} diff --git a/src/bin/using-crate-apalis-sqlite.rs b/src/bin/using-crate-apalis-sqlite.rs new file mode 100644 index 0000000..a9cfe02 --- /dev/null +++ b/src/bin/using-crate-apalis-sqlite.rs @@ -0,0 +1,84 @@ +use anyhow::Result; +use apalis::prelude::*; +use apalis_sql::sqlite::SqliteStorage; +use chrono::Utc; +use jobs::apalis_email_service::{send_email, Email}; +use serde::{Deserialize, Serialize}; +use sqlx::SqlitePool; + +#[derive(Debug, Deserialize, Serialize)] +pub struct Notification { + pub to: String, + pub text: String, +} + +pub async fn notify(job: Notification) { + tracing::info!("Attempting to send notification to {}", job.to); +} + +async fn produce_emails(storage: &SqliteStorage) -> Result<()> { + let mut storage = storage.clone(); + for i in 0..1 { + storage + .schedule( + Email { + to: format!("test{i}@example.com"), + text: "Test background job from apalis".to_string(), + subject: "Background email job".to_string(), + }, + (Utc::now() + chrono::Duration::seconds(4)).timestamp(), + ) + .await?; + } + Ok(()) +} + +async fn produce_notifications(storage: &SqliteStorage) -> Result<()> { + let mut storage = storage.clone(); + for i in 0..20 { + storage + .push(Notification { + to: format!("notify:{i}@example.com"), + text: "Test background job from apalis".to_string(), + }) + .await?; + } + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<()> { + std::env::set_var("RUST_LOG", "debug,sqlx::query=info"); + tracing_subscriber::fmt::init(); + + let pool = SqlitePool::connect("sqlite::memory:").await?; + // Do migrations: Mainly for "sqlite::memory:" + SqliteStorage::setup(&pool) + .await + .expect("unable to run migrations for sqlite"); + + let email_storage: SqliteStorage = SqliteStorage::new(pool.clone()); + + produce_emails(&email_storage).await?; + + let notification_storage: SqliteStorage = SqliteStorage::new(pool); + + produce_notifications(¬ification_storage).await?; + + Monitor::new() + .register({ + WorkerBuilder::new("tasty-banana") + .enable_tracing() + .backend(email_storage) + .build_fn(send_email) + }) + .register({ + WorkerBuilder::new("tasty-mango") + // .enable_tracing() + .backend(notification_storage) + .build_fn(notify) + }) + .run() + .await?; + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index 89180da..506c8f2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1 +1,2 @@ +pub mod apalis_email_service; pub mod tcs_helpers;