Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,305 changes: 577 additions & 728 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ tracing-subscriber = { version = "0.3.19", features = [
"fmt",
] }
uuid = { version = "1.16.0", features = ["serde", "v4"] }
ydb = "0.9.11"
ydb-grpc = "0.1.0"
ydb = "0.10.1"
ydb-grpc = "0.2.0"
headers = "0.4.1"
axum-extra = { version = "0.10.1", features = ["typed-header"] }
axum-client-ip = "1.1.3"
Expand Down
2 changes: 0 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,4 @@ RUN apt-get update && \

COPY --from=builder /usr/src/app/target/release/urlshortener /usr/local/bin/urlshortener

ENV RUST_LOG="trace"

CMD ["urlshortener"]
190 changes: 190 additions & 0 deletions src/consumer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
use std::{sync::Arc, time::Duration};
use ua_parser::Extractor;

use crate::entity::VisitInfo;
use tokio::sync::Mutex;
use tracing::{Level, span};
use ydb::{TableClient, TopicReader, YdbError};

use crate::db;

pub async fn create(
table_path: String,
ua_extractor: Extractor<'static>,
topic_table_client: TableClient,
table_client: TableClient,
reader: TopicReader,
) {
let worker_span = span!(Level::DEBUG, "visit_collect_worker");
let _guard = worker_span.enter();

let reader_mutex = Arc::new(Mutex::new(reader));

let topic_tc = &topic_table_client;
let tc_ref = &table_client;
let table_path_ref = &table_path;
let ua_extractor_ref = &ua_extractor;

loop {
let result = topic_tc
.retry_transaction(|mut t| {
let reader_mutex = reader_mutex.clone();

async move {
let mut reader_guard = reader_mutex.lock().await;

let batch_result = tokio::time::timeout(
Duration::from_secs(3),
reader_guard.pop_batch_in_tx(&mut t),
)
.await;

match batch_result {
Ok(Ok(batch)) => {
tracing::debug!("read batch with {} messages", batch.messages.len());

let mut visits = Vec::<db::VisitData>::new();
for mut message in batch.messages {
let raw_data: Result<Option<Vec<u8>>, YdbError> =
message.read_and_take().await;

let json_bytes =
raw_data.unwrap().ok_or("empty message data").unwrap();
let visit_info: VisitInfo =
match serde_json::from_slice(&json_bytes) {
Ok(visit_info) => visit_info,
Err(err) => {
let s = String::from_utf8_lossy(&json_bytes);

tracing::error!(
"deserialize visit info: {} message: {}",
err,
s
);

continue;
}
};

let mut ua_info = None;
let mut referer = None;
let mut ip = None;

if let Some(info) = visit_info.request_info {
referer = info.referer;
ip = info.ip;
ua_info = parse_ua_info(ua_extractor_ref, info.user_agent);
};

visits.push(db::VisitData {
code: visit_info.link_info.code,
url: visit_info.link_info.url,

ua_info,
referer,
ip,

utm_campaign: visit_info.link_info.utm_campaign,
utm_content: visit_info.link_info.utm_content,
utm_source: visit_info.link_info.utm_source,

event_timestamp: visit_info.created_at,
});
}

if !visits.is_empty() {
tracing::debug!("visits batch: {:?}", &visits);

match db::add_visit(table_path_ref.clone(), tc_ref, visits).await {
Err(err) => {
tracing::error!("add visits: {}", err);
}
_ => {
tracing::debug!("visits added successfully");
}
};
}

t.commit().await?;

tracing::debug!("consumer batch committed successfully");

Ok(true)
}
Ok(Err(err)) => {
tracing::error!("reading batch: {err}");
Err(ydb::YdbOrCustomerError::YDB(err))
}
Err(_timeout_err) => {
tracing::debug!("timeout reading batch - no more messages available");
Ok(false)
}
}
}
})
.await;

match result {
Ok(true) => {}
Ok(false) => {
tracing::debug!("all messages have been read and stored");
}
Err(err) => {
tracing::error!("transaction failed: {err}");
}
}
}
}

fn parse_ua_info(extractor: &Extractor, user_agent: Option<String>) -> Option<db::UaInfo> {
let ua_str = match user_agent {
Some(ua_str) => ua_str,
_ => return None,
};

let ua = &ua_str.clone();
let ext = extractor.extract(ua);

let mut res = db::UaInfo {
user_agent: Some(ua_str),
browser: None,
browser_version: None,
os: None,
os_version: None,
device: None,
};

if let Some(user_agent) = ext.0 {
let user_agent = user_agent.into_owned();
res.browser = Some(user_agent.family);
res.browser_version = Some(format!(
"{}.{}.{}",
user_agent.major.unwrap_or("N/A".to_string()),
user_agent.minor.unwrap_or("N/A".to_string()),
user_agent.patch.unwrap_or("N/A".to_string()),
));
}

if let Some(os) = ext.1 {
let os = os.into_owned();
res.os = Some(os.os);
res.os_version = Some(format!(
"{}.{}.{}",
os.major.unwrap_or("N/A".to_string()),
os.minor.unwrap_or("N/A".to_string()),
os.patch.unwrap_or("N/A".to_string()),
));
};

if let Some(device) = ext.2 {
let d = device.into_owned();
res.device = Some(format!(
"{} {} {}",
d.brand.unwrap_or("N/A".to_string()),
d.device,
d.model.unwrap_or("N/A".to_string()),
));
};

Some(res)
}
12 changes: 6 additions & 6 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::net::IpAddr;
use chrono::{DateTime, TimeZone, Utc};
use serde::{Deserialize, Serialize};
use std::time::{Duration, UNIX_EPOCH};
use tracing::trace;
use ydb::{
ClientBuilder, CommandLineCredentials, MetadataUrlCredentials, Query, TableClient, Value,
YdbError, YdbOrCustomerError, YdbResult, ydb_params, ydb_struct,
Expand All @@ -27,8 +28,7 @@ pub async fn init_db(connection_key: String, env: Environment) -> ydb::YdbResult

client.wait().await?;

// https://github.com/ydb-platform/ydb-rs-sdk/issues/371
tokio::time::sleep(Duration::from_millis(100)).await;
trace!("init_db wait finished");

Ok(client)
}
Expand Down Expand Up @@ -218,10 +218,10 @@ pub async fn insert(table_client: &TableClient, data: CreateData) -> ydb::YdbRes

match res {
Err(YdbOrCustomerError::YDB(YdbError::YdbStatusError(status_err))) => {
if let Ok(status_code) = status_err.operation_status() {
if status_code == StatusCode::PreconditionFailed {
return Ok(());
}
if let Ok(status_code) = status_err.operation_status()
&& status_code == StatusCode::PreconditionFailed
{
return Ok(());
}

Err(YdbError::YdbStatusError(status_err))
Expand Down
67 changes: 67 additions & 0 deletions src/entity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use std::net::IpAddr;

use axum::extract::FromRequestParts;
use axum_client_ip::ClientIp;
use axum_extra::{TypedHeader, headers::Referer, headers::UserAgent};
use chrono::DateTime;
use http::request::Parts;
use serde::{Deserialize, Serialize};

use crate::db;

#[derive(Debug, Serialize, Deserialize)]
pub struct VisitInfo {
#[serde(skip_serializing_if = "Option::is_none")]
pub request_info: Option<RequestInfo>,
pub link_info: db::LinkInfo,
pub created_at: DateTime<chrono::Utc>,
}

impl VisitInfo {
pub fn is_empty(&self) -> bool {
self.request_info.is_none()
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct RequestInfo {
#[serde(skip_serializing_if = "Option::is_none")]
pub user_agent: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub referer: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ip: Option<IpAddr>,
}

impl RequestInfo {
pub fn is_empty(&self) -> bool {
self.user_agent.is_none() && self.referer.is_none() && self.ip.is_none()
}
}

impl<S> FromRequestParts<S> for RequestInfo
where
S: Send + Sync,
{
type Rejection = std::convert::Infallible;

async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
let user_agent = TypedHeader::<UserAgent>::from_request_parts(parts, _state)
.await
.map(|TypedHeader(ua)| ua.to_string())
.ok();

let referer = TypedHeader::<Referer>::from_request_parts(parts, _state)
.await
.map(|TypedHeader(r)| r.to_string())
.ok();

let ip = parts.extensions.get::<ClientIp>().map(|ClientIp(ip)| *ip);

Ok(RequestInfo {
user_agent,
referer,
ip,
})
}
}
Loading