Migrate database updates to use TimeEntrys over ReportRows
				
					
				
			This commit is contained in:
		
							parent
							
								
									62657f2bdd
								
							
						
					
					
						commit
						7759632848
					
				
							
								
								
									
										50
									
								
								src/db.rs
									
									
									
									
									
								
							
							
						
						
									
										50
									
								
								src/db.rs
									
									
									
									
									
								
							| @ -4,38 +4,24 @@ use sea_orm::sea_query::OnConflict; | |||||||
| use sea_orm::{NotSet, Set}; | use sea_orm::{NotSet, Set}; | ||||||
| 
 | 
 | ||||||
| impl ReportRow { | impl ReportRow { | ||||||
|     pub(crate) fn as_models(&self) -> Vec<time_entry::ActiveModel> { |     pub fn to_time_entries(&self, workspace_id: i64) -> Vec<TimeEntry> { | ||||||
|         self.time_entries |         self.time_entries.iter() | ||||||
|             .iter() |             .map(|inner| TimeEntry { | ||||||
|             .map(|inner| time_entry::ActiveModel { |                 id: inner.id as i64, | ||||||
|                 id: NotSet, |                 description: self.description.clone(), | ||||||
|                 toggl_id: Set(inner.id as i64), |                 project_id: self.project_id.map(|id| id as i64), | ||||||
|                 description: Set(self.description.clone()), |                 task_id: self.task_id.map(|id| id as i64), | ||||||
|                 project_id: Set(self.project_id.map(|id| id as i64)), |                 billable: self.billable, | ||||||
|                 start: Set(inner.start.fixed_offset()), |                 start: inner.start, | ||||||
|                 stop: Set(inner.stop.fixed_offset()), |                 stop: Some(inner.stop), | ||||||
|                 raw_json: Set(serde_json::to_value(inner).unwrap()), |                 at: inner.at, | ||||||
|                 server_deleted_at: NotSet, |                 server_deleted_at: None, | ||||||
|                 server_updated_at: Set(inner.at.fixed_offset()), |                 tags: vec![], // TODO: tags on report row import, need to track in separate table
 | ||||||
|                 // TODO: tags on report row import, need to track in separate table
 |                 workspace_id, | ||||||
|                 tags: NotSet, |                 duration: inner.seconds as i64, | ||||||
|             }) |                 tag_ids: self.tag_ids.iter().map(|ids| *ids as i64).collect(), | ||||||
|             .collect() |                 user_id: self.user_id as i64, | ||||||
|     } |             }).collect() | ||||||
| 
 |  | ||||||
|     pub fn grafting_conflict_statement() -> OnConflict { |  | ||||||
|         OnConflict::column(time_entry::Column::TogglId) |  | ||||||
|             .update_columns(vec![ |  | ||||||
|                 time_entry::Column::Description, |  | ||||||
|                 time_entry::Column::ProjectId, |  | ||||||
|                 time_entry::Column::Start, |  | ||||||
|                 time_entry::Column::Stop, |  | ||||||
|                 time_entry::Column::RawJson, |  | ||||||
|                 time_entry::Column::ServerUpdatedAt, |  | ||||||
|                 time_entry::Column::ServerDeletedAt, |  | ||||||
|                 // time_entry::Column::Tags, TODO add tags to conflict resolution when implemented
 |  | ||||||
|             ]) |  | ||||||
|             .to_owned() |  | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | |||||||
							
								
								
									
										81
									
								
								src/poll.rs
									
									
									
									
									
								
							
							
						
						
									
										81
									
								
								src/poll.rs
									
									
									
									
									
								
							| @ -1,5 +1,5 @@ | |||||||
| use crate::entity::prelude::TimeEntry; | use crate::entity::prelude::TimeEntry; | ||||||
| use crate::entity::{client, project}; | use crate::entity::{client, project, time_entry}; | ||||||
| use crate::toggl_api::types::{Client, Project, TogglReportQuery}; | use crate::toggl_api::types::{Client, Project, TogglReportQuery}; | ||||||
| use crate::toggl_api::TogglApiClient; | use crate::toggl_api::TogglApiClient; | ||||||
| use crate::utils; | use crate::utils; | ||||||
| @ -11,7 +11,9 @@ use sea_orm::{ | |||||||
|     Statement, |     Statement, | ||||||
| }; | }; | ||||||
| use std::ops::Sub; | use std::ops::Sub; | ||||||
|  | use migration::Order; | ||||||
| use tracing::instrument; | use tracing::instrument; | ||||||
|  | use crate::sync_service::update_database; | ||||||
| 
 | 
 | ||||||
| #[tracing::instrument(skip(client, db))] | #[tracing::instrument(skip(client, db))] | ||||||
| pub async fn poll_job(client: TogglApiClient, db: DatabaseConnection, poll_period: u64) { | pub async fn poll_job(client: TogglApiClient, db: DatabaseConnection, poll_period: u64) { | ||||||
| @ -37,65 +39,36 @@ pub async fn poll_job(client: TogglApiClient, db: DatabaseConnection, poll_perio | |||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[instrument(skip(client, db))] | #[instrument(skip(toggl_client, db))] | ||||||
| pub async fn perform_poll( | pub async fn perform_poll( | ||||||
|     client: &TogglApiClient, |     toggl_client: &TogglApiClient, | ||||||
|     db: &DatabaseConnection, |     db: &DatabaseConnection, | ||||||
| ) -> utils::Result<usize> { | ) -> utils::Result<usize> { | ||||||
|     let since = db |     let since = time_entry::Entity::find() | ||||||
|         .query_one(Statement::from_string( |         .select_only() | ||||||
|             Postgres, |         .column(time_entry::Column::ServerUpdatedAt) | ||||||
|             r#"select server_updated_at from time_entry order by server_updated_at desc limit 1"#, |         .order_by(time_entry::Column::ServerUpdatedAt, Order::Desc) | ||||||
|         )) |         .into_tuple::<DateTime<FixedOffset>>() | ||||||
|         .await? |         .one(db) | ||||||
|         .map(|row| row.try_get_by::<DateTime<FixedOffset>, _>("server_updated_at")) |         .await?; | ||||||
|         .transpose()? |  | ||||||
|         .unwrap_or( |  | ||||||
|             chrono::Utc::now() |  | ||||||
|                 .sub(chrono::Duration::days(1)) |  | ||||||
|                 .fixed_offset(), |  | ||||||
|         ); |  | ||||||
| 
 | 
 | ||||||
|     let time_entries = client |     let since = since.unwrap_or( | ||||||
|  |         chrono::Utc::now() | ||||||
|  |             .sub(chrono::Duration::days(1)) | ||||||
|  |             .fixed_offset(), | ||||||
|  |     ); | ||||||
|  | 
 | ||||||
|  |     let time_entries = toggl_client | ||||||
|         .fetch_time_entries_modified_since(since.to_utc()) |         .fetch_time_entries_modified_since(since.to_utc()) | ||||||
|         .await?; |         .await?; | ||||||
| 
 | 
 | ||||||
|     let existing_project_ids = project::Entity::find() |     // These are changes only so there is no need to enforce exclusivity
 | ||||||
|         .select_only() |     update_database( | ||||||
|         .column(project::Column::TogglId) |         db, | ||||||
|         .into_tuple::<i64>() |         toggl_client, | ||||||
|         .all(db) |         &time_entries, | ||||||
|         .await?; |         None, | ||||||
|  |     ).await?; | ||||||
| 
 | 
 | ||||||
|     let new_projects = time_entries |     Ok(time_entries.len()) | ||||||
|         .iter() |  | ||||||
|         .filter_map(|entry| entry.project_id) |  | ||||||
|         .any(|project_id| !existing_project_ids.contains(&(project_id as i64))); |  | ||||||
| 
 |  | ||||||
|     if new_projects { |  | ||||||
|         let clients = client.fetch_clients().await?; |  | ||||||
| 
 |  | ||||||
|         client::Entity::insert_many(clients.iter().map(Client::as_model)) |  | ||||||
|             .on_conflict(Client::grafting_conflict_statement()) |  | ||||||
|             .exec(db) |  | ||||||
|             .await?; |  | ||||||
| 
 |  | ||||||
|         let projects = client.fetch_projects().await?; |  | ||||||
| 
 |  | ||||||
|         project::Entity::insert_many(projects.iter().map(Project::as_model)) |  | ||||||
|             .on_conflict(Project::grafting_conflict_statement()) |  | ||||||
|             .exec(db) |  | ||||||
|             .await?; |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     crate::sync_service::cache_report( |  | ||||||
|         &db, |  | ||||||
|         &report, |  | ||||||
|         Some(day_exclusivity_condition( |  | ||||||
|             now.date_naive(), |  | ||||||
|             now.date_naive(), |  | ||||||
|         )), |  | ||||||
|     ) |  | ||||||
|     .await?; |  | ||||||
|     Ok(report.len()) |  | ||||||
| } | } | ||||||
|  | |||||||
| @ -1,5 +1,3 @@ | |||||||
| use crate::entity::time_entry; |  | ||||||
| use crate::entity::time_entry::Entity as TimeEntry; |  | ||||||
| use crate::toggl_api::types::{self, Client, Project, ReportRow, TogglReportQuery}; | use crate::toggl_api::types::{self, Client, Project, ReportRow, TogglReportQuery}; | ||||||
| use crate::toggl_api::TogglApiClient; | use crate::toggl_api::TogglApiClient; | ||||||
| use crate::{entity, sync_service, utils}; | use crate::{entity, sync_service, utils}; | ||||||
| @ -8,13 +6,12 @@ use axum::extract::{Multipart, Query}; | |||||||
| use axum::http::StatusCode; | use axum::http::StatusCode; | ||||||
| use axum::response::IntoResponse; | use axum::response::IntoResponse; | ||||||
| use axum::{Extension, Json}; | use axum::{Extension, Json}; | ||||||
| use chrono::NaiveDate; | use chrono::{DateTime, Utc}; | ||||||
| use migration::Condition; | use sea_orm::{DatabaseConnection, EntityTrait}; | ||||||
| use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; |  | ||||||
| use serde::Deserialize; | use serde::Deserialize; | ||||||
| use serde_json::Value; | use serde_json::Value; | ||||||
| use std::collections::HashMap; | use std::collections::HashMap; | ||||||
| use tracing::{debug, instrument}; | use tracing::{instrument}; | ||||||
| 
 | 
 | ||||||
| #[instrument(skip(db, toggl_client))] | #[instrument(skip(db, toggl_client))] | ||||||
| pub async fn report( | pub async fn report( | ||||||
| @ -23,11 +20,13 @@ pub async fn report( | |||||||
|     Json(query): Json<TogglReportQuery>, |     Json(query): Json<TogglReportQuery>, | ||||||
| ) -> utils::Result<Json<Vec<ReportRow>>> { | ) -> utils::Result<Json<Vec<ReportRow>>> { | ||||||
|     let report = toggl_client.full_report(&query).await?; |     let report = toggl_client.full_report(&query).await?; | ||||||
|     debug!("Returned results: {:?}", report); |     let time_entries = report.iter().flat_map(|entry| entry.to_time_entries(toggl_client.workspace_id())).collect::<Vec<_>>(); | ||||||
| 
 |     sync_service::update_database( | ||||||
|     // We don't perform any deletes on report-fetched entries as they aren't necessarily exclusive
 |         &db, | ||||||
|     // on their time range.
 |         &toggl_client, | ||||||
|     sync_service::cache_report(&db, &report, None).await?; |         &time_entries, | ||||||
|  |         None, | ||||||
|  |     ).await?; | ||||||
| 
 | 
 | ||||||
|     Ok(Json(report)) |     Ok(Json(report)) | ||||||
| } | } | ||||||
| @ -111,29 +110,41 @@ pub async fn import_csv( | |||||||
| 
 | 
 | ||||||
| #[derive(Debug, Clone, Deserialize)] | #[derive(Debug, Clone, Deserialize)] | ||||||
| pub struct RefreshQuery { | pub struct RefreshQuery { | ||||||
|     start_date: Option<String>, |     start_date: Option<DateTime<Utc>>, | ||||||
|  |     end_date: Option<DateTime<Utc>>, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[instrument(skip(toggl_client, db))] | #[instrument(skip(toggl_client, db))] | ||||||
| pub async fn refresh( | pub async fn refresh( | ||||||
|     Extension(toggl_client): Extension<TogglApiClient>, |     Extension(toggl_client): Extension<TogglApiClient>, | ||||||
|     Extension(db): Extension<DatabaseConnection>, |     Extension(db): Extension<DatabaseConnection>, | ||||||
|     Query(RefreshQuery { start_date }): Query<RefreshQuery>, |     Query(RefreshQuery { start_date, end_date }): Query<RefreshQuery>, | ||||||
| ) -> utils::Result<&'static str> { | ) -> utils::Result<&'static str> { | ||||||
|     let end_date = chrono::Utc::now(); |     let time_entries = match (start_date, end_date) { | ||||||
|     let end_date_query_string = end_date.date_naive().format("%Y-%m-%d").to_string(); |         (Some(start_date), Some(end_date)) => { | ||||||
|     let start_date_query_string = start_date.unwrap_or(end_date_query_string.clone()); |             toggl_client.fetch_time_entries_in_range(start_date, end_date).await? | ||||||
|     let start_date = NaiveDate::parse_from_str(&start_date_query_string, "%Y-%m-%d")?; |         } | ||||||
| 
 | 
 | ||||||
|     let query = TogglReportQuery { |         (Some(start_date), None) => { | ||||||
|         start_date: Some(start_date_query_string), |             let end_date = Utc::now(); | ||||||
|         end_date: Some(end_date_query_string), |             toggl_client.fetch_time_entries_in_range(start_date, end_date).await? | ||||||
|         ..Default::default() |         } | ||||||
|  | 
 | ||||||
|  |         (None, Some(_)) => { | ||||||
|  |             return Err(anyhow!("start_date must be provided if end_date is provided").into()); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         _ => { | ||||||
|  |             toggl_client.fetch_recent_time_entries().await? | ||||||
|  |         } | ||||||
|     }; |     }; | ||||||
| 
 | 
 | ||||||
|     let report = toggl_client.full_report(&query).await?; |     sync_service::update_database( | ||||||
|     let exclusivity_condition = utils::day_exclusivity_condition(start_date, end_date.date_naive()); |         &db, | ||||||
|     sync_service::cache_report(&db, &report, Some(exclusivity_condition)).await?; |         &toggl_client, | ||||||
|  |         &time_entries, | ||||||
|  |         None, | ||||||
|  |     ).await?; | ||||||
| 
 | 
 | ||||||
|     Ok("Ok") |     Ok("Ok") | ||||||
| } | } | ||||||
|  | |||||||
| @ -1,24 +1,69 @@ | |||||||
| use crate::entity::time_entry; | use crate::entity::{client, project, time_entry}; | ||||||
| use crate::entity::time_entry::Entity as TimeEntry; | use crate::entity::time_entry::Entity as TimeEntry; | ||||||
| use crate::toggl_api::types::ReportRow; | use crate::toggl_api::types::{Client, Project, ReportRow, TimeEntry as ToggleApiTimeEntry}; | ||||||
| use crate::utils; | use crate::utils; | ||||||
| use migration::Condition; | use migration::Condition; | ||||||
| use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; | use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect}; | ||||||
| use tracing::{debug, instrument}; | use tracing::{debug, instrument}; | ||||||
|  | use crate::toggl_api::TogglApiClient; | ||||||
| 
 | 
 | ||||||
| #[instrument(skip_all)] | pub async fn update_database( | ||||||
| pub async fn cache_report( |  | ||||||
|     db: &DatabaseConnection, |     db: &DatabaseConnection, | ||||||
|     models: &Vec<ReportRow>, |     toggl_client: &TogglApiClient, | ||||||
|  |     time_entries: &[ToggleApiTimeEntry], | ||||||
|     exclusive_on: Option<Condition>, |     exclusive_on: Option<Condition>, | ||||||
| ) -> utils::Result<()> { | ) -> utils::Result<()> { | ||||||
|     let models = models.iter().flat_map(|entry| entry.as_models()); |     let (deleted_entries, time_entries) = time_entries | ||||||
|     let models = models.collect::<Vec<_>>(); |  | ||||||
|     let ids = models |  | ||||||
|         .iter() |         .iter() | ||||||
|         .map(|entry| entry.toggl_id.clone().unwrap()) |         .partition::<Vec<_>, _>(|entry| entry.server_deleted_at.is_some()); | ||||||
|  | 
 | ||||||
|  |     let deleted_ids = deleted_entries.iter() | ||||||
|  |         .map(|entry| entry.id) | ||||||
|  |         .collect::<Vec<_>>(); | ||||||
|  | 
 | ||||||
|  |     if !deleted_ids.is_empty() { | ||||||
|  |         debug!("Deleting time entries: {:?}", deleted_ids); | ||||||
|  |         TimeEntry::delete_many() | ||||||
|  |             .filter(time_entry::Column::TogglId.is_in(deleted_ids)) | ||||||
|  |             .exec(db) | ||||||
|  |             .await?; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     let existing_project_ids = project::Entity::find() | ||||||
|  |         .select_only() | ||||||
|  |         .column(project::Column::TogglId) | ||||||
|  |         .into_tuple::<i64>() | ||||||
|  |         .all(db) | ||||||
|  |         .await?; | ||||||
|  | 
 | ||||||
|  |     let new_projects = time_entries | ||||||
|  |         .iter() | ||||||
|  |         .filter_map(|entry| entry.project_id) | ||||||
|  |         .any(|project_id| !existing_project_ids.contains(&(project_id as i64))); | ||||||
|  | 
 | ||||||
|  |     if new_projects { | ||||||
|  |         let clients = toggl_client.fetch_clients().await?; | ||||||
|  | 
 | ||||||
|  |         client::Entity::insert_many(clients.iter().map(Client::as_model)) | ||||||
|  |             .on_conflict(Client::grafting_conflict_statement()) | ||||||
|  |             .exec(db) | ||||||
|  |             .await?; | ||||||
|  | 
 | ||||||
|  |         let projects = toggl_client.fetch_projects().await?; | ||||||
|  | 
 | ||||||
|  |         project::Entity::insert_many(projects.iter().map(Project::as_model)) | ||||||
|  |             .on_conflict(Project::grafting_conflict_statement()) | ||||||
|  |             .exec(db) | ||||||
|  |             .await?; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     let ids = time_entries.iter() | ||||||
|  |         .map(|entry| entry.id) | ||||||
|  |         .collect::<Vec<_>>(); | ||||||
|  | 
 | ||||||
|  |     let models = time_entries.into_iter() | ||||||
|  |         .map(|entry| entry.as_model()) | ||||||
|         .collect::<Vec<_>>(); |         .collect::<Vec<_>>(); | ||||||
|     debug!("Caching report entries: {:?}", models); |  | ||||||
| 
 | 
 | ||||||
|     // TODO: Why is this needed?
 |     // TODO: Why is this needed?
 | ||||||
|     if models.is_empty() { |     if models.is_empty() { | ||||||
| @ -26,7 +71,7 @@ pub async fn cache_report( | |||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     TimeEntry::insert_many(models) |     TimeEntry::insert_many(models) | ||||||
|         .on_conflict(ReportRow::grafting_conflict_statement()) |         .on_conflict(ToggleApiTimeEntry::grafting_conflict_statement()) | ||||||
|         .exec(db) |         .exec(db) | ||||||
|         .await?; |         .await?; | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -40,6 +40,10 @@ impl TogglApiClient { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     pub fn workspace_id(&self) -> i64 { | ||||||
|  |         self.workspace_id.parse().unwrap() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     pub async fn check_health(&self) -> bool { |     pub async fn check_health(&self) -> bool { | ||||||
|         true |         true | ||||||
|     } |     } | ||||||
| @ -100,6 +104,15 @@ impl TogglApiClient { | |||||||
|         Ok(clients) |         Ok(clients) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     pub async fn fetch_recent_time_entries(&self) -> crate::Result<Vec<TimeEntry>> { | ||||||
|  |         let url = format!("{base_url}/me/time_entries", base_url = self.base_url); | ||||||
|  | 
 | ||||||
|  |         Ok(self.make_request(self.client.get(url)) | ||||||
|  |             .await? | ||||||
|  |             .json::<Vec<TimeEntry>>() | ||||||
|  |             .await?) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     pub async fn fetch_time_entries_modified_since( |     pub async fn fetch_time_entries_modified_since( | ||||||
|         &self, |         &self, | ||||||
|         date_time: DateTime<Utc>, |         date_time: DateTime<Utc>, | ||||||
| @ -119,7 +132,7 @@ impl TogglApiClient { | |||||||
| 
 | 
 | ||||||
|     pub async fn fetch_time_entries_in_range( |     pub async fn fetch_time_entries_in_range( | ||||||
|         &self, |         &self, | ||||||
|         (start, end): (DateTime<Utc>, DateTime<Utc>), |         start: DateTime<Utc>, end: DateTime<Utc>, | ||||||
|     ) -> crate::Result<Vec<TimeEntry>> { |     ) -> crate::Result<Vec<TimeEntry>> { | ||||||
|         let url = format!("{base_url}/me/time_entries", base_url = self.base_url); |         let url = format!("{base_url}/me/time_entries", base_url = self.base_url); | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -35,7 +35,7 @@ pub struct TimeEntry { | |||||||
|     pub id: i64, |     pub id: i64, | ||||||
|     pub workspace_id: i64, |     pub workspace_id: i64, | ||||||
|     pub project_id: Option<i64>, |     pub project_id: Option<i64>, | ||||||
|     pub task_id: Option<Value>, |     pub task_id: Option<i64>, | ||||||
|     pub billable: bool, |     pub billable: bool, | ||||||
|     pub start: DateTime<Utc>, |     pub start: DateTime<Utc>, | ||||||
|     pub stop: Option<DateTime<Utc>>, |     pub stop: Option<DateTime<Utc>>, | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user