Add two bins using crate apalis-sql.

This commit is contained in:
Erik Nordstrøm 2025-02-23 00:05:11 +01:00
parent beb4fd40c5
commit b98050dc7a
4 changed files with 263 additions and 0 deletions

114
src/apalis_email_service.rs Normal file
View file

@ -0,0 +1,114 @@
use std::{str::FromStr, sync::Arc};
use apalis::prelude::*;
use email_address::EmailAddress;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Email {
pub to: String,
pub subject: String,
pub text: String,
}
pub async fn send_email(job: Email) -> Result<(), Error> {
let validation = EmailAddress::from_str(&job.to);
match validation {
Ok(email) => {
log::info!("Attempting to send email to {}", email.as_str());
Ok(())
}
Err(email_address::Error::InvalidCharacter) => {
log::error!("Killed send email job. Invalid character {}", job.to);
Err(Error::Abort(Arc::new(Box::new(
email_address::Error::InvalidCharacter,
))))
}
Err(e) => Err(Error::Failed(Arc::new(Box::new(e)))),
}
}
pub fn example_good_email() -> Email {
Email {
subject: "Test Subject".to_string(),
to: "example@gmail.com".to_string(),
text: "Some Text".to_string(),
}
}
pub fn example_killed_email() -> Email {
Email {
subject: "Test Subject".to_string(),
to: "example@©.com".to_string(), // killed because it has © which is invalid
text: "Some Text".to_string(),
}
}
pub fn example_retry_able_email() -> Email {
Email {
subject: "Test Subject".to_string(),
to: "example".to_string(),
text: "Some Text".to_string(),
}
}
pub const FORM_HTML: &str = r#"
<!doctype html>
<html>
<head>
<link href="https://unpkg.com/tailwindcss@1.2.0/dist/tailwind.min.css" rel="stylesheet">
<meta credits="https://tailwindcomponents.com/component/basic-contact-form" />
</head>
<body>
<form style="margin: 0 auto;" class="w-full max-w-lg pt-20" action="/" method="post">
<div class="flex flex-wrap -mx-3 mb-6">
<div class="w-full md:w-2/3 px-3 mb-6 md:mb-0">
<label class="block uppercase tracking-wide text-gray-700 text-xs font-bold mb-2" for="to">
To
</label>
<input class="appearance-none block w-full bg-gray-200 text-gray-700 border border-red-500 rounded py-3 px-4 mb-3 leading-tight focus:outline-none focus:bg-white" id="to" type="email" name="to" placeholder="test@example.com">
<p class="text-red-500 text-xs italic">Please fill out this field.</p>
</div>
</div>
<div class="flex flex-wrap -mx-3 mb-6">
<div class="w-full px-3">
<label class="block uppercase tracking-wide text-gray-700 text-xs font-bold mb-2" for="subject">
Subject
</label>
<input class="appearance-none block w-full bg-gray-200 text-gray-700 border border-gray-200 rounded py-3 px-4 mb-3 leading-tight focus:outline-none focus:bg-white focus:border-gray-500" id="subject" type="text" name="subject">
<p class="text-gray-600 text-xs italic">Some tips - as long as needed</p>
</div>
</div>
<div class="flex flex-wrap -mx-3 mb-6">
<div class="w-full px-3">
<label class="block uppercase tracking-wide text-gray-700 text-xs font-bold mb-2" for="text">
Message
</label>
<textarea class=" no-resize appearance-none block w-full bg-gray-200 text-gray-700 border border-gray-200 rounded py-3 px-4 mb-3 leading-tight focus:outline-none focus:bg-white focus:border-gray-500 h-48 resize-none" id="text" name="text" ></textarea>
</div>
</div>
<div class="md:flex md:items-center">
<div class="md:w-1/3">
<button class="shadow bg-teal-400 hover:bg-teal-400 focus:shadow-outline focus:outline-none text-white font-bold py-2 px-4 rounded" type="submit">
Send
</button>
</div>
<div class="md:w-2/3"></div>
</div>
</form>
</body>
</html>
"#;
#[derive(Debug)]
pub enum EmailError {
NoStorage,
SomeError(&'static str),
}
impl std::fmt::Display for EmailError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self:?}")
}
}

View file

@ -0,0 +1,64 @@
use anyhow::Result;
use apalis::layers::retry::RetryPolicy;
use apalis::prelude::*;
use apalis_sql::{
postgres::{PgListen, PgPool, PostgresStorage},
Config,
};
use jobs::apalis_email_service::{send_email, Email};
use tracing::{debug, info};
async fn produce_jobs(storage: &mut PostgresStorage<Email>) -> Result<()> {
for index in 0..10 {
storage
.push(Email {
to: format!("test{}@example.com", index),
text: "Test background job from apalis".to_string(),
subject: "Background email job".to_string(),
})
.await?;
}
// The sql way
tracing::info!("You can also add jobs via sql query, run this: \n Select apalis.push_job('apalis::Email', json_build_object('subject', 'Test apalis', 'to', 'test1@example.com', 'text', 'Lorem Ipsum'));");
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
std::env::set_var("RUST_LOG", "debug,sqlx::query=error");
tracing_subscriber::fmt::init();
let database_url = std::env::var("DATABASE_URL").expect("Must specify path to db");
let pool = PgPool::connect(&database_url).await?;
PostgresStorage::setup(&pool)
.await
.expect("unable to run migrations for postgres");
let mut pg = PostgresStorage::new_with_config(pool.clone(), Config::new("apalis::Email"));
produce_jobs(&mut pg).await?;
let mut listener = PgListen::new(pool).await?;
listener.subscribe_with(&mut pg);
tokio::spawn(async move {
listener.listen().await.unwrap();
});
Monitor::new()
.register({
WorkerBuilder::new("tasty-orange")
.retry(RetryPolicy::retries(5))
.enable_tracing()
.backend(pg)
.build_fn(send_email)
})
.on_event(|e| debug!("{e}"))
.run_with_signal(async {
tokio::signal::ctrl_c().await?;
info!("Shutting down the system");
Ok(())
})
.await?;
Ok(())
}

View file

@ -0,0 +1,84 @@
use anyhow::Result;
use apalis::prelude::*;
use apalis_sql::sqlite::SqliteStorage;
use chrono::Utc;
use jobs::apalis_email_service::{send_email, Email};
use serde::{Deserialize, Serialize};
use sqlx::SqlitePool;
#[derive(Debug, Deserialize, Serialize)]
pub struct Notification {
pub to: String,
pub text: String,
}
pub async fn notify(job: Notification) {
tracing::info!("Attempting to send notification to {}", job.to);
}
async fn produce_emails(storage: &SqliteStorage<Email>) -> Result<()> {
let mut storage = storage.clone();
for i in 0..1 {
storage
.schedule(
Email {
to: format!("test{i}@example.com"),
text: "Test background job from apalis".to_string(),
subject: "Background email job".to_string(),
},
(Utc::now() + chrono::Duration::seconds(4)).timestamp(),
)
.await?;
}
Ok(())
}
async fn produce_notifications(storage: &SqliteStorage<Notification>) -> Result<()> {
let mut storage = storage.clone();
for i in 0..20 {
storage
.push(Notification {
to: format!("notify:{i}@example.com"),
text: "Test background job from apalis".to_string(),
})
.await?;
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
std::env::set_var("RUST_LOG", "debug,sqlx::query=info");
tracing_subscriber::fmt::init();
let pool = SqlitePool::connect("sqlite::memory:").await?;
// Do migrations: Mainly for "sqlite::memory:"
SqliteStorage::setup(&pool)
.await
.expect("unable to run migrations for sqlite");
let email_storage: SqliteStorage<Email> = SqliteStorage::new(pool.clone());
produce_emails(&email_storage).await?;
let notification_storage: SqliteStorage<Notification> = SqliteStorage::new(pool);
produce_notifications(&notification_storage).await?;
Monitor::new()
.register({
WorkerBuilder::new("tasty-banana")
.enable_tracing()
.backend(email_storage)
.build_fn(send_email)
})
.register({
WorkerBuilder::new("tasty-mango")
// .enable_tracing()
.backend(notification_storage)
.build_fn(notify)
})
.run()
.await?;
Ok(())
}

View file

@ -1 +1,2 @@
pub mod apalis_email_service;
pub mod tcs_helpers;