|
|
using System; using System.Text.Json; using System.Threading.Tasks; using Azure.Storage.Blobs; using Azure.Storage.Blobs.Models; using Azure.Messaging.ServiceBus; using Microsoft.Azure.Functions.Worker; using Microsoft.Extensions.Logging; using Google.Protobuf.Reflection;
namespace CDP { public class EventProcessor { private static string FileAuditContainer = "FileAudits"; private static string UserAuditContainer = "UserAudits"; private static string GroupAuditContainer = "GroupAudits"; private static string TenantAuditContainer = "TenantAudits"; private readonly ILogger<EventProcessor> _logger;
public EventProcessor(ILogger<EventProcessor> logger) { _logger = logger; }
[Function("ProcessEvent")] public async Task ProcessCDPJob( [ServiceBusTrigger("mail-events-queue", Connection = "ServiceBusConnectionString")] ServiceBusReceivedMessage message, ServiceBusMessageActions messageActions) { _logger.LogInformation("Message ID: {id}", message.MessageId); _logger.LogInformation("Message Body: {body}", message.Body); _logger.LogInformation("Message Content-Type: {contentType}", message.ContentType); Job cdpJob; try { cdpJob = JsonSerializer.Deserialize<Job>(message.Body); } catch (Exception ex) { _logger.LogError("received incorrect structure in body: {0} got exception: {1}", message.Body, ex); await messageActions.CompleteMessageAsync(message); return; } if (cdpJob != null) { switch (cdpJob.EventType) { case JobType.MailMetaProcessing: await ProcessMailMetaJob(cdpJob); break; case JobType.KeyVaultInsertion: _logger.LogInformation("key vault insertion processing"); await ProcessKeyVaultInsertionJob(cdpJob); break; case JobType.AddAudits: _logger.LogInformation("Adding cdp audits"); await ProcessAuditEvent(cdpJob); break; case JobType.KeyVaultInsertionBatch: _logger.LogInformation("batch key vault insertion processing"); await ProcessKeyVaultBatchJob(cdpJob); break; case JobType.AddAuditsBatch: _logger.LogInformation("batch audit insertion in progress"); await ProcessAuditsBatchJob(cdpJob); break; default: break; } } // Complete the message
await messageActions.CompleteMessageAsync(message); }
internal async Task ProcessAuditEvent(Job job) { AuditEventMetadata auditEventMetadata = JsonSerializer.Deserialize<AuditEventMetadata>(job.JobMetadata); await AuditFunctions.AddAudits(job.AppKey, auditEventMetadata.FileId, auditEventMetadata.FileName, auditEventMetadata.UserId, "", auditEventMetadata.Action, auditEventMetadata.Message); return; }
internal async Task ProcessKeyVaultBatchJob(Job job) { KeyVaultService kvs = new KeyVaultService(Constants.KeyVaultURI); List<KeyVaultEvent> events = new List<KeyVaultEvent>(); try { events = JsonSerializer.Deserialize<List<KeyVaultEvent>>(job.JobMetadata); events.ForEach(async (KeyVaultEvent evt) => { await kvs.SetSecretAsync(evt.FileId, evt.AESKey); }); } catch (Exception ex) { _logger.LogError("Error parsing job: {1} keyvault event metadata error: {0}", ex.ToString(), JsonSerializer.Serialize(job)); return; } }
internal async Task ProcessAuditsBatchJob(Job job) { try { List<AuditRecord> tenantAuditRecords = new List<AuditRecord>(); List<AuditRecord> userAuditRecords = new List<AuditRecord>(); List<AuditEventMetadata> auditEventMetadata = JsonSerializer.Deserialize<List<AuditEventMetadata>>(job.JobMetadata); for(int i = 0; i<auditEventMetadata.Count; i++) { FileAuditRecord fileAuditRecord = new FileAuditRecord { Action = auditEventMetadata[i].Action, FileId = auditEventMetadata[i].FileId, FileName = auditEventMetadata[i].FileName, Message = auditEventMetadata[i].Message, UserId = auditEventMetadata[i].UserId, GroupId = "" }; await AuditDB.AppendRecord(fileAuditRecord.id, fileAuditRecord, FileAuditContainer); TenantAuditRecord auditRecord = new TenantAuditRecord { AppKey = job.AppKey, Action = auditEventMetadata[i].Action, FileId = auditEventMetadata[i].FileId, FileName = auditEventMetadata[i].FileName, Message = auditEventMetadata[i].Message, UserId = auditEventMetadata[i].UserId, GroupId = "", EventTime = DateTime.UtcNow, };
tenantAuditRecords.Add(auditRecord);
UserAuditRecord userAuditRecord = new UserAuditRecord { AppKey = job.AppKey, FileId = auditEventMetadata[i].FileId, FileName = auditEventMetadata[i].FileName, Message = auditEventMetadata[i].Message, UserId = auditEventMetadata[i].UserId, Action = auditEventMetadata[i].Action, EventTime = DateTime.UtcNow, GroupId = "" }; userAuditRecords.Add(userAuditRecord); }
await AuditDB.AppendRecordsList(job.AppKey, tenantAuditRecords, TenantAuditContainer); await AuditDB.AppendRecordsList(userAuditRecords[0].UserId, userAuditRecords, UserAuditContainer); } catch (Exception ex) { _logger.LogError($"{ex.Message}"); } return; }
internal async Task ProcessKeyVaultInsertionJob(Job cdpJob) { KeyVaultService kvs = new KeyVaultService(Constants.KeyVaultURI); KeyVaultEvent vaultEvent; try { vaultEvent = JsonSerializer.Deserialize<KeyVaultEvent>(cdpJob.JobMetadata); } catch (Exception ex) { _logger.LogError("Error parsing keyvault event metadata error: {0}",ex.ToString()); return; } await kvs.SetSecretAsync(vaultEvent.FileId, vaultEvent.AESKey); return; }
internal async Task ProcessMailMetaJob(Job cdpJob) { string Connection = Constants.StorageConnString; var blobServiceClient = new BlobServiceClient(Connection); var blobClient = blobServiceClient.GetBlobContainerClient(cdpJob.AppKey.ToLower()); var blob = blobClient.GetBlobClient(string.Format("{0}", cdpJob.Id)); var exists = await blob.ExistsAsync(); if (!exists) { return; } BlobDownloadInfo blobDownloadInfo = await blob.DownloadAsync(); _logger.LogDebug($"Exists?: {exists.ToString()}"); _logger.LogDebug($"Download resp: {blobDownloadInfo.Details}"); using (MemoryStream ms = new MemoryStream()) { await blobDownloadInfo.Content.CopyToAsync(ms); byte[] byteArray = ms.ToArray(); string mailRecJson = System.Text.Encoding.UTF8.GetString(byteArray); _logger.LogInformation("got event json from blob: " + mailRecJson); MailRecord record = JsonSerializer.Deserialize<MailRecord>(mailRecJson); await MailProcessor.ProcessMailEvent(record, _logger); } return; } }
public class MailRecord { public required string AppKey { get; set; } public required string SenderEmail { get; set; } public required string MailId { get; set; } // this is actually body Id
public required string BodyContent { get; set; } // this is body content
public required List<string> Attachments { get; set; } public required List<string> ReceiverEmails { get; set; } public required List<AttachmentDetails> AttachmentDetails { get; set; } }
public class AttachmentDetails { public required string FileId { get; set; } public required string HeadRevisionId { get; set; } public string? FileType { get; set; } public required string FileName { get; set; } }
public class Job { public required string Id { get; set; } public required string AppKey { get; set; } public required JobType EventType { get; set; } public string? JobMetadata { get; set; } }
public enum JobType { MailMetaProcessing, KeyVaultInsertion, AddAudits, KeyVaultInsertionBatch, AddAuditsBatch } public class KeyVaultEvent { public required string FileId { get; set; } public required string AESKey { get; set; } }
public class AuditEventMetadata { public required string FileId { get; set; } public string FileName { get; set; } public string UserId { get; set; } public string GroupId { get; set; } public string Action { get; set; } public string Message { get; set; } } }
|