|  | 
|  | 1 | +use std::sync::Arc; | 
|  | 2 | + | 
|  | 3 | +use anyhow::Result; | 
|  | 4 | +use azure_storage_blobs::prelude::{BlobServiceClient, ContainerClient}; | 
|  | 5 | +use spin_core::async_trait; | 
|  | 6 | +use spin_factor_blobstore::{Container, ContainerManager, Error}; | 
|  | 7 | + | 
|  | 8 | +pub mod auth; | 
|  | 9 | +mod incoming_data; | 
|  | 10 | +mod object_names; | 
|  | 11 | + | 
|  | 12 | +use auth::AzureBlobAuthOptions; | 
|  | 13 | +use incoming_data::AzureIncomingData; | 
|  | 14 | +use object_names::AzureObjectNames; | 
|  | 15 | + | 
|  | 16 | +pub struct AzureContainerManager { | 
|  | 17 | +    client: BlobServiceClient, | 
|  | 18 | +} | 
|  | 19 | + | 
|  | 20 | +impl AzureContainerManager { | 
|  | 21 | +    pub fn new(auth_options: AzureBlobAuthOptions) -> Result<Self> { | 
|  | 22 | +        let (account, credentials) = match auth_options { | 
|  | 23 | +            AzureBlobAuthOptions::AccountKey(config) => ( | 
|  | 24 | +                config.account.clone(), | 
|  | 25 | +                azure_storage::StorageCredentials::access_key(&config.account, config.key.clone()), | 
|  | 26 | +            ), | 
|  | 27 | +            AzureBlobAuthOptions::Environmental => { | 
|  | 28 | +                let account = std::env::var("STORAGE_ACCOUNT").expect("missing STORAGE_ACCOUNT"); | 
|  | 29 | +                let access_key = | 
|  | 30 | +                    std::env::var("STORAGE_ACCESS_KEY").expect("missing STORAGE_ACCOUNT_KEY"); | 
|  | 31 | +                ( | 
|  | 32 | +                    account.clone(), | 
|  | 33 | +                    azure_storage::StorageCredentials::access_key(account, access_key), | 
|  | 34 | +                ) | 
|  | 35 | +            } | 
|  | 36 | +        }; | 
|  | 37 | + | 
|  | 38 | +        let client = azure_storage_blobs::prelude::ClientBuilder::new(account, credentials) | 
|  | 39 | +            .blob_service_client(); | 
|  | 40 | +        Ok(Self { client }) | 
|  | 41 | +    } | 
|  | 42 | +} | 
|  | 43 | + | 
|  | 44 | +#[async_trait] | 
|  | 45 | +impl ContainerManager for AzureContainerManager { | 
|  | 46 | +    async fn get(&self, name: &str) -> Result<Arc<dyn Container>, Error> { | 
|  | 47 | +        Ok(Arc::new(AzureContainer { | 
|  | 48 | +            _label: name.to_owned(), | 
|  | 49 | +            client: self.client.container_client(name), | 
|  | 50 | +        })) | 
|  | 51 | +    } | 
|  | 52 | + | 
|  | 53 | +    fn is_defined(&self, _store_name: &str) -> bool { | 
|  | 54 | +        true | 
|  | 55 | +    } | 
|  | 56 | +} | 
|  | 57 | + | 
|  | 58 | +struct AzureContainer { | 
|  | 59 | +    _label: String, | 
|  | 60 | +    client: ContainerClient, | 
|  | 61 | +} | 
|  | 62 | + | 
|  | 63 | +/// Azure doesn't provide us with a container creation time | 
|  | 64 | +const DUMMY_CREATED_AT: u64 = 0; | 
|  | 65 | + | 
|  | 66 | +#[async_trait] | 
|  | 67 | +impl Container for AzureContainer { | 
|  | 68 | +    async fn exists(&self) -> anyhow::Result<bool> { | 
|  | 69 | +        Ok(self.client.exists().await?) | 
|  | 70 | +    } | 
|  | 71 | + | 
|  | 72 | +    async fn name(&self) -> String { | 
|  | 73 | +        self.client.container_name().to_owned() | 
|  | 74 | +    } | 
|  | 75 | + | 
|  | 76 | +    async fn info(&self) -> anyhow::Result<spin_factor_blobstore::ContainerMetadata> { | 
|  | 77 | +        let properties = self.client.get_properties().await?; | 
|  | 78 | +        Ok(spin_factor_blobstore::ContainerMetadata { | 
|  | 79 | +            name: properties.container.name, | 
|  | 80 | +            created_at: DUMMY_CREATED_AT, | 
|  | 81 | +        }) | 
|  | 82 | +    } | 
|  | 83 | + | 
|  | 84 | +    async fn clear(&self) -> anyhow::Result<()> { | 
|  | 85 | +        anyhow::bail!("Azure blob storage does not support clearing containers") | 
|  | 86 | +    } | 
|  | 87 | + | 
|  | 88 | +    async fn delete_object(&self, name: &str) -> anyhow::Result<()> { | 
|  | 89 | +        self.client.blob_client(name).delete().await?; | 
|  | 90 | +        Ok(()) | 
|  | 91 | +    } | 
|  | 92 | + | 
|  | 93 | +    async fn delete_objects(&self, names: &[String]) -> anyhow::Result<()> { | 
|  | 94 | +        // TODO: are atomic semantics required? or efficiency guarantees? | 
|  | 95 | +        let futures = names.iter().map(|name| self.delete_object(name)); | 
|  | 96 | +        futures::future::try_join_all(futures).await?; | 
|  | 97 | +        Ok(()) | 
|  | 98 | +    } | 
|  | 99 | + | 
|  | 100 | +    async fn has_object(&self, name: &str) -> anyhow::Result<bool> { | 
|  | 101 | +        Ok(self.client.blob_client(name).exists().await?) | 
|  | 102 | +    } | 
|  | 103 | + | 
|  | 104 | +    async fn object_info( | 
|  | 105 | +        &self, | 
|  | 106 | +        name: &str, | 
|  | 107 | +    ) -> anyhow::Result<spin_factor_blobstore::ObjectMetadata> { | 
|  | 108 | +        let response = self.client.blob_client(name).get_properties().await?; | 
|  | 109 | +        Ok(spin_factor_blobstore::ObjectMetadata { | 
|  | 110 | +            name: name.to_string(), | 
|  | 111 | +            container: self.client.container_name().to_string(), | 
|  | 112 | +            created_at: response | 
|  | 113 | +                .blob | 
|  | 114 | +                .properties | 
|  | 115 | +                .creation_time | 
|  | 116 | +                .unix_timestamp() | 
|  | 117 | +                .try_into() | 
|  | 118 | +                .unwrap(), | 
|  | 119 | +            size: response.blob.properties.content_length, | 
|  | 120 | +        }) | 
|  | 121 | +    } | 
|  | 122 | + | 
|  | 123 | +    async fn get_data( | 
|  | 124 | +        &self, | 
|  | 125 | +        name: &str, | 
|  | 126 | +        start: u64, | 
|  | 127 | +        end: u64, | 
|  | 128 | +    ) -> anyhow::Result<Box<dyn spin_factor_blobstore::IncomingData>> { | 
|  | 129 | +        // We can't use a Rust range because the Azure type does not accept inclusive ranges, | 
|  | 130 | +        // and we don't want to add 1 to `end` if it's already at MAX! | 
|  | 131 | +        let range = if end == u64::MAX { | 
|  | 132 | +            azure_core::request_options::Range::RangeFrom(start..) | 
|  | 133 | +        } else { | 
|  | 134 | +            azure_core::request_options::Range::Range(start..(end + 1)) | 
|  | 135 | +        }; | 
|  | 136 | +        let client = self.client.blob_client(name); | 
|  | 137 | +        Ok(Box::new(AzureIncomingData::new(client, range))) | 
|  | 138 | +    } | 
|  | 139 | + | 
|  | 140 | +    async fn write_data( | 
|  | 141 | +        &self, | 
|  | 142 | +        name: &str, | 
|  | 143 | +        data: tokio::io::ReadHalf<tokio::io::SimplexStream>, | 
|  | 144 | +        finished_tx: tokio::sync::mpsc::Sender<anyhow::Result<()>>, | 
|  | 145 | +    ) -> anyhow::Result<()> { | 
|  | 146 | +        let client = self.client.blob_client(name); | 
|  | 147 | + | 
|  | 148 | +        tokio::spawn(async move { | 
|  | 149 | +            let write_result = Self::write_data_core(data, client).await; | 
|  | 150 | +            finished_tx | 
|  | 151 | +                .send(write_result) | 
|  | 152 | +                .await | 
|  | 153 | +                .expect("should sent finish tx"); | 
|  | 154 | +        }); | 
|  | 155 | + | 
|  | 156 | +        Ok(()) | 
|  | 157 | +    } | 
|  | 158 | + | 
|  | 159 | +    async fn list_objects(&self) -> anyhow::Result<Box<dyn spin_factor_blobstore::ObjectNames>> { | 
|  | 160 | +        let stm = self.client.list_blobs().into_stream(); | 
|  | 161 | +        Ok(Box::new(AzureObjectNames::new(stm))) | 
|  | 162 | +    } | 
|  | 163 | +} | 
|  | 164 | + | 
|  | 165 | +impl AzureContainer { | 
|  | 166 | +    async fn write_data_core( | 
|  | 167 | +        mut data: tokio::io::ReadHalf<tokio::io::SimplexStream>, | 
|  | 168 | +        client: azure_storage_blobs::prelude::BlobClient, | 
|  | 169 | +    ) -> anyhow::Result<()> { | 
|  | 170 | +        use tokio::io::AsyncReadExt; | 
|  | 171 | + | 
|  | 172 | +        // Azure limits us to 50k blocks per blob.  At 2MB/block that allows 100GB, which will be | 
|  | 173 | +        // enough for most use cases.  If users need flexibility for larger blobs, we could make | 
|  | 174 | +        // the block size configurable via the runtime config ("size hint" or something). | 
|  | 175 | +        const BLOCK_SIZE: usize = 2 * 1024 * 1024; | 
|  | 176 | + | 
|  | 177 | +        let mut blocks = vec![]; | 
|  | 178 | + | 
|  | 179 | +        'put_blocks: loop { | 
|  | 180 | +            let mut bytes = Vec::with_capacity(BLOCK_SIZE); | 
|  | 181 | +            loop { | 
|  | 182 | +                let read = data.read_buf(&mut bytes).await?; | 
|  | 183 | +                let len = bytes.len(); | 
|  | 184 | + | 
|  | 185 | +                if read == 0 { | 
|  | 186 | +                    // end of stream - send the last block and go | 
|  | 187 | +                    let id_bytes = uuid::Uuid::new_v4().as_bytes().to_vec(); | 
|  | 188 | +                    let block_id = azure_storage_blobs::prelude::BlockId::new(id_bytes); | 
|  | 189 | +                    client.put_block(block_id.clone(), bytes).await?; | 
|  | 190 | +                    blocks.push(azure_storage_blobs::blob::BlobBlockType::Uncommitted( | 
|  | 191 | +                        block_id, | 
|  | 192 | +                    )); | 
|  | 193 | +                    break 'put_blocks; | 
|  | 194 | +                } | 
|  | 195 | +                if len >= BLOCK_SIZE { | 
|  | 196 | +                    let id_bytes = uuid::Uuid::new_v4().as_bytes().to_vec(); | 
|  | 197 | +                    let block_id = azure_storage_blobs::prelude::BlockId::new(id_bytes); | 
|  | 198 | +                    client.put_block(block_id.clone(), bytes).await?; | 
|  | 199 | +                    blocks.push(azure_storage_blobs::blob::BlobBlockType::Uncommitted( | 
|  | 200 | +                        block_id, | 
|  | 201 | +                    )); | 
|  | 202 | +                    break; | 
|  | 203 | +                } | 
|  | 204 | +            } | 
|  | 205 | +        } | 
|  | 206 | + | 
|  | 207 | +        let block_list = azure_storage_blobs::blob::BlockList { blocks }; | 
|  | 208 | +        client.put_block_list(block_list).await?; | 
|  | 209 | + | 
|  | 210 | +        Ok(()) | 
|  | 211 | +    } | 
|  | 212 | +} | 
0 commit comments