using Azure.Messaging.ServiceBus; using Microsoft.Extensions.Logging; using System; using System.Text.Json; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Azure.Storage.Blobs; using Microsoft.AspNetCore.Mvc; using Microsoft.Azure.Functions.Worker.Http; using Microsoft.Azure.Functions.Worker; using System.Net; using Microsoft.DurableTask; using Microsoft.DurableTask.Client; using FromBodyAttribute = Microsoft.Azure.Functions.Worker.Http.FromBodyAttribute; namespace CDP { public class MetaProcessor { private readonly ILogger _logger; private static Lazy lazyBusClient = new Lazy(InitializeServiceBusClient); private static ServiceBusClient _serviceBusClient = lazyBusClient.Value; public MetaProcessor(ILogger logger) { _logger = logger; } private static ServiceBusClient InitializeServiceBusClient() { return new ServiceBusClient(Constants.SvcBusConnectionString); } public static async Task PublishBatchJobs(List jobs) { //Job[] jobsCopy = jobs.ToArray(); var sender = _serviceBusClient.CreateSender("mail-events-queue"); ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); jobs.ForEach(job => { var msg = new ServiceBusMessage(JsonSerializer.Serialize(job)); bool isAdded = batch.TryAddMessage(msg); }); await sender.SendMessagesAsync(batch); } public static async Task PublishJob(Job job) { var sender = _serviceBusClient.CreateSender("mail-events-queue"); ServiceBusMessage msg = new ServiceBusMessage(JsonSerializer.Serialize(job)); await sender.SendMessageAsync(msg); } [Function("MailMetaProcessor")] public async Task MailMetaProcessor([ActivityTrigger] MailRecord mailRecord, FunctionContext functionContext) { Job job = await TriggerMailProcessor(mailRecord); return job; } [Function("MailMetaOrchestrator")] public async Task MailMetaOrchestrator([OrchestrationTrigger] TaskOrchestrationContext ctx) { try { MailRecord record = ctx.GetInput(); string jobId = await ctx.CallActivityAsync(nameof(AddProtectionAudits), record); Job job = await ctx.CallActivityAsync(nameof(MailMetaProcessor), record); string revId = await ctx.CallActivityAsync(nameof(AddMailCompundRevision), record); return job; } catch (Exception ex) { _logger.LogError(ex.Message); return new Job() { AppKey = "", EventType = JobType.MailMetaProcessing, Id = "failed" }; } } [Function("AddMailMetaDurable")] public async Task AddMailMetaDurable([HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req, [FromBody] MailRecord recordMail, [DurableClient] DurableTaskClient client) { bool isValid = true; //bool isValid = await CDPLite.CheckJwtRequest(req); if (!isValid) { HttpResponseData res = req.CreateResponse(HttpStatusCode.Unauthorized); return res; } try { string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(MailMetaOrchestrator), recordMail); _logger.LogInformation("created instance: {0}", instanceId); return client.CreateCheckStatusResponse(req, instanceId); } catch (Exception ex) { _logger.LogError(ex.ToString()); HttpResponseData response = req.CreateResponse(HttpStatusCode.InternalServerError); return response; } } [Function("AddMailMeta")] public async Task AddMailMeta([HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req, [FromBody] MailRecord recordMail) { bool isValid = true; //bool isValid = await CDPLite.CheckJwtRequest(req); if (!isValid) { return new UnauthorizedObjectResult("Jwt has expired please validate"); } try { MailRecord mailRecord = recordMail; //_logger.LogInformation(JsonSerializer.Serialize(recordMail)); string res = await TriggerProtectionAuditsJob(mailRecord); res = await AddMailCompoundRevisionInternal(mailRecord); _logger.LogInformation($"protection audits? {res}"); Job job = await TriggerMailProcessor(mailRecord); return new OkObjectResult(job); } catch (Exception ex) { _logger.LogError(ex.ToString()); return new StatusCodeResult(500); } } [Function("AddMailForwardingRequest")] public async Task AddMailForwardingRequest([HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req) { ForwardMailDto forwardMail; try { string requestBody = await new StreamReader(req.Body).ReadToEndAsync(); _logger.LogInformation("requestbody: {0}", requestBody); forwardMail = JsonSerializer.Deserialize(requestBody); } catch (Exception ex) { _logger.LogInformation(ex.ToString()); return new StatusCodeResult(500); } string appKey = forwardMail.AppKey; var userId = Helpers.HashAndShortenText(forwardMail.SenderEmail.ToLower()); var fr = await CDPDB.GetFile(appKey, forwardMail.MailId, userId); if ((!fr.Policy.CheckAccess("Manage")) && (!fr.Policy.CheckAccess("Owner"))) { return new UnauthorizedObjectResult("User doesn't have rights to forward this email"); } RevisionEntry entry = await FileRevision.GetFileRevisionInternal(forwardMail.AppKey, forwardMail.MailId); if (entry.CompoundDocument == null) { return new BadRequestObjectResult("IncorrectMailId"); } MailMetaRecord mailRecord = JsonSerializer.Deserialize(entry.CompoundDocument.Contents); mailRecord.AttachmentDetails.ForEach(async (att) => { forwardMail.ReceiverEmails.ForEach(async (receiver) => { await AddMailForwardPolicy(appKey, att.FileId, att.FileName, forwardMail.SenderEmail, receiver, "Read"); }); }); forwardMail.ReceiverEmails.ForEach(async (receiver) => { await AddMailForwardPolicy(appKey, mailRecord.BodyId, string.Concat(mailRecord.BodyId, "-", "MailBody"), forwardMail.SenderEmail, receiver, "Read"); }); return new OkObjectResult("Done"); } internal async Task AddMailForwardPolicy(string appKey, string fileId, string fileName, string sender, string receiver, string policy = "Read") { AddFileUserDto addFileUserDto = new AddFileUserDto { AppKey = appKey, FileId = fileId, FileName = fileName, Email = sender, EmailToAdd = receiver, Policy = policy }; await CDPLite.AddFileUserInternal(addFileUserDto); } [Function("AddMailCompundRevision")] public async Task AddMailCompundRevision([ActivityTrigger] MailRecord record, FunctionContext context) { return await AddMailCompoundRevisionInternal(record); } internal async Task AddMailCompoundRevisionInternal(MailRecord record) { RevisionDocument doc = new RevisionDocument { RevisionId = Guid.NewGuid().ToString(), RevisionDate = DateTime.UtcNow, EditedBy = record.SenderEmail, Comments = "MailBody" }; MailMetaRecord mailMeta = new MailMetaRecord { AppKey = record.AppKey, SenderEmail = record.SenderEmail, BodyContent = record.BodyContent, BodyId = record.MailId, AttachmentDetails = record.AttachmentDetails }; CompoundDocument compound = new CompoundDocument { Id = Guid.NewGuid().ToString(), AppKey = record.AppKey, Contents = JsonSerializer.Serialize(mailMeta), DocumentType = "MailRecord" }; string revisionId = await FileRevision.AddMailBodyRevision(doc, compound, record.AppKey, record.MailId); return revisionId; } [Function("AddProtectionAudits")] public async Task AddProtectionAudits([ActivityTrigger] MailRecord record, FunctionContext functionContext) { var res = await TriggerProtectionAuditsJob(record); return res; } internal async Task TriggerProtectionAuditsJob(MailRecord record) { try { string userId = Helpers.HashAndShortenText(record.SenderEmail); string message = string.Format($"{record.SenderEmail} protected an email body with id: {record.MailId}."); List auditEvents = new List(); AuditEventMetadata auditEvent = new AuditEventMetadata { Action = "Addded", FileId = record.MailId, FileName = "Outlook-Mail-Body", Message = message, UserId = userId }; auditEvents.Add(auditEvent); record.AttachmentDetails.ForEach((att) => { message = string.Format($"{record.SenderEmail} protected the email attachment {att.FileName} with id: {att.FileId}."); AuditEventMetadata attachmentEvent = new AuditEventMetadata { Action = "Added", FileId = att.FileId, FileName = att.FileName, Message = message, UserId = userId }; auditEvents.Add(attachmentEvent); }); Job job = await CDPLite.AddAuditsBatchedEvent(auditEvents, record.AppKey); await PublishJob(job); _logger.LogInformation(JsonSerializer.Serialize(job)); return "Done"; } catch (Exception ex) { _logger.LogError(ex.ToString()); throw ex; } } internal async Task TriggerMailProcessor(MailRecord mailRecord) { string Connection = Constants.StorageConnString; var blobSvc = new BlobServiceClient(Connection); var blobClient = blobSvc.GetBlobContainerClient(mailRecord.AppKey.ToLower()); if (!blobClient.Exists()) { await blobClient.CreateAsync(); } byte[] fileBytes = System.Text.Encoding.UTF8.GetBytes(JsonSerializer.Serialize(mailRecord)); string jobId = Guid.NewGuid().ToString(); var blob = blobClient.GetBlobClient(jobId); Stream fs = new MemoryStream(fileBytes); try { await blob.UploadAsync(fs); } catch (Exception ex) { _logger.LogError(ex.ToString()); throw ex; } Job job = new Job { AppKey = mailRecord.AppKey, EventType = JobType.MailMetaProcessing, Id = jobId }; await PublishJob(job); return job; } } public class MailMetaRecord { public required string AppKey { get; set; } public required string SenderEmail { get; set; } public required string BodyId { get; set; } public required string BodyContent { get; set; } //encrypted body public required List AttachmentDetails { get; set; } } public class ForwardMailDto { public string SenderEmail { get; set; } public List ReceiverEmails { get; set; } public string MailId { get; set; } public string AppKey { get; set; } } }