using System; using System.Text.Json; using System.Threading.Tasks; using Azure.Storage.Blobs; using Azure.Storage.Blobs.Models; using Azure.Messaging.ServiceBus; using Microsoft.Azure.Functions.Worker; using Microsoft.Extensions.Logging; using Google.Protobuf.Reflection; namespace CDP { public class EventProcessor { private static string FileAuditContainer = "FileAudits"; private static string UserAuditContainer = "UserAudits"; private static string GroupAuditContainer = "GroupAudits"; private static string TenantAuditContainer = "TenantAudits"; private readonly ILogger _logger; public EventProcessor(ILogger logger) { _logger = logger; } [Function("ProcessEvent")] public async Task ProcessCDPJob( [ServiceBusTrigger("mail-events-queue", Connection = "ServiceBusConnectionString")] ServiceBusReceivedMessage message, ServiceBusMessageActions messageActions) { _logger.LogInformation("Message ID: {id}", message.MessageId); _logger.LogInformation("Message Body: {body}", message.Body); _logger.LogInformation("Message Content-Type: {contentType}", message.ContentType); Job cdpJob; try { cdpJob = JsonSerializer.Deserialize(message.Body); } catch (Exception ex) { _logger.LogError("received incorrect structure in body: {0} got exception: {1}", message.Body, ex); await messageActions.CompleteMessageAsync(message); return; } if (cdpJob != null) { switch (cdpJob.EventType) { case JobType.MailMetaProcessing: await ProcessMailMetaJob(cdpJob); break; case JobType.KeyVaultInsertion: _logger.LogInformation("key vault insertion processing"); await ProcessKeyVaultInsertionJob(cdpJob); break; case JobType.AddAudits: _logger.LogInformation("Adding cdp audits"); await ProcessAuditEvent(cdpJob); break; case JobType.KeyVaultInsertionBatch: _logger.LogInformation("batch key vault insertion processing"); await ProcessKeyVaultBatchJob(cdpJob); break; case JobType.AddAuditsBatch: _logger.LogInformation("batch audit insertion in progress"); await ProcessAuditsBatchJob(cdpJob); break; default: break; } } // Complete the message await messageActions.CompleteMessageAsync(message); } internal async Task ProcessAuditEvent(Job job) { AuditEventMetadata auditEventMetadata = JsonSerializer.Deserialize(job.JobMetadata); await AuditFunctions.AddAudits(job.AppKey, auditEventMetadata.FileId, auditEventMetadata.FileName, auditEventMetadata.UserId, "", auditEventMetadata.Action, auditEventMetadata.Message); return; } internal async Task ProcessKeyVaultBatchJob(Job job) { KeyVaultService kvs = new KeyVaultService(Constants.KeyVaultURI); List events = new List(); try { events = JsonSerializer.Deserialize>(job.JobMetadata); events.ForEach(async (KeyVaultEvent evt) => { await kvs.SetSecretAsync(evt.FileId, evt.AESKey); }); } catch (Exception ex) { _logger.LogError("Error parsing job: {1} keyvault event metadata error: {0}", ex.ToString(), JsonSerializer.Serialize(job)); return; } } internal async Task ProcessAuditsBatchJob(Job job) { try { List tenantAuditRecords = new List(); List userAuditRecords = new List(); List auditEventMetadata = JsonSerializer.Deserialize>(job.JobMetadata); for(int i = 0; i(cdpJob.JobMetadata); } catch (Exception ex) { _logger.LogError("Error parsing keyvault event metadata error: {0}",ex.ToString()); return; } await kvs.SetSecretAsync(vaultEvent.FileId, vaultEvent.AESKey); return; } internal async Task ProcessMailMetaJob(Job cdpJob) { string Connection = Constants.StorageConnString; var blobServiceClient = new BlobServiceClient(Connection); var blobClient = blobServiceClient.GetBlobContainerClient(cdpJob.AppKey.ToLower()); var blob = blobClient.GetBlobClient(string.Format("{0}", cdpJob.Id)); var exists = await blob.ExistsAsync(); if (!exists) { return; } BlobDownloadInfo blobDownloadInfo = await blob.DownloadAsync(); _logger.LogDebug($"Exists?: {exists.ToString()}"); _logger.LogDebug($"Download resp: {blobDownloadInfo.Details}"); using (MemoryStream ms = new MemoryStream()) { await blobDownloadInfo.Content.CopyToAsync(ms); byte[] byteArray = ms.ToArray(); string mailRecJson = System.Text.Encoding.UTF8.GetString(byteArray); _logger.LogInformation("got event json from blob: " + mailRecJson); MailRecord record = JsonSerializer.Deserialize(mailRecJson); await MailProcessor.ProcessMailEvent(record, _logger); } return; } } public class MailRecord { public required string AppKey { get; set; } public required string SenderEmail { get; set; } public required string MailId { get; set; } // this is actually body Id public required string BodyContent { get; set; } // this is body content public required List Attachments { get; set; } public required List ReceiverEmails { get; set; } public required List AttachmentDetails { get; set; } } public class AttachmentDetails { public required string FileId { get; set; } public required string HeadRevisionId { get; set; } public string? FileType { get; set; } public required string FileName { get; set; } } public class Job { public required string Id { get; set; } public required string AppKey { get; set; } public required JobType EventType { get; set; } public string? JobMetadata { get; set; } } public enum JobType { MailMetaProcessing, KeyVaultInsertion, AddAudits, KeyVaultInsertionBatch, AddAuditsBatch } public class KeyVaultEvent { public required string FileId { get; set; } public required string AESKey { get; set; } } public class AuditEventMetadata { public required string FileId { get; set; } public string FileName { get; set; } public string UserId { get; set; } public string GroupId { get; set; } public string Action { get; set; } public string Message { get; set; } } }