Temporary repo to track my changes on LTS functions app porting
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

251 lines
10 KiB

using System;
using System.Text.Json;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Google.Protobuf.Reflection;
namespace CDP
{
public class EventProcessor
{
private static string FileAuditContainer = "FileAudits";
private static string UserAuditContainer = "UserAudits";
private static string GroupAuditContainer = "GroupAudits";
private static string TenantAuditContainer = "TenantAudits";
private readonly ILogger<EventProcessor> _logger;
public EventProcessor(ILogger<EventProcessor> logger)
{
_logger = logger;
}
[Function("ProcessEvent")]
public async Task ProcessCDPJob(
[ServiceBusTrigger("mail-events-queue", Connection = "ServiceBusConnectionString")]
ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
Job cdpJob;
try
{
cdpJob = JsonSerializer.Deserialize<Job>(message.Body);
}
catch (Exception ex)
{
_logger.LogError("received incorrect structure in body: {0} got exception: {1}", message.Body, ex);
await messageActions.CompleteMessageAsync(message);
return;
}
if (cdpJob != null)
{
switch (cdpJob.EventType)
{
case JobType.MailMetaProcessing:
await ProcessMailMetaJob(cdpJob);
break;
case JobType.KeyVaultInsertion:
_logger.LogInformation("key vault insertion processing");
await ProcessKeyVaultInsertionJob(cdpJob);
break;
case JobType.AddAudits:
_logger.LogInformation("Adding cdp audits");
await ProcessAuditEvent(cdpJob);
break;
case JobType.KeyVaultInsertionBatch:
_logger.LogInformation("batch key vault insertion processing");
await ProcessKeyVaultBatchJob(cdpJob);
break;
case JobType.AddAuditsBatch:
_logger.LogInformation("batch audit insertion in progress");
await ProcessAuditsBatchJob(cdpJob);
break;
default:
break;
}
}
// Complete the message
await messageActions.CompleteMessageAsync(message);
}
internal async Task ProcessAuditEvent(Job job)
{
AuditEventMetadata auditEventMetadata = JsonSerializer.Deserialize<AuditEventMetadata>(job.JobMetadata);
await CDPLite.AddAudits(job.AppKey, auditEventMetadata.FileId, auditEventMetadata.FileName, auditEventMetadata.UserId, "", auditEventMetadata.Action, auditEventMetadata.Message);
return;
}
internal async Task ProcessKeyVaultBatchJob(Job job)
{
KeyVaultService kvs = new KeyVaultService(Constants.KeyVaultURI);
List<KeyVaultEvent> events = new List<KeyVaultEvent>();
try
{
events = JsonSerializer.Deserialize<List<KeyVaultEvent>>(job.JobMetadata);
events.ForEach(async (KeyVaultEvent evt) =>
{
await kvs.SetSecretAsync(evt.FileId, evt.AESKey);
});
} catch (Exception ex)
{
_logger.LogError("Error parsing job: {1} keyvault event metadata error: {0}", ex.ToString(), JsonSerializer.Serialize(job));
return;
}
}
internal async Task ProcessAuditsBatchJob(Job job)
{
try
{
List<AuditRecord> tenantAuditRecords = new List<AuditRecord>();
List<AuditRecord> userAuditRecords = new List<AuditRecord>();
List<AuditEventMetadata> auditEventMetadata = JsonSerializer.Deserialize<List<AuditEventMetadata>>(job.JobMetadata);
for(int i = 0; i<auditEventMetadata.Count; i++)
{
FileAuditRecord fileAuditRecord = new FileAuditRecord
{
Action = auditEventMetadata[i].Action,
FileId = auditEventMetadata[i].FileId,
FileName = auditEventMetadata[i].FileName,
Message = auditEventMetadata[i].Message,
UserId = auditEventMetadata[i].UserId,
GroupId = ""
};
await AuditDB.AppendRecord(fileAuditRecord.id, fileAuditRecord, FileAuditContainer);
TenantAuditRecord auditRecord = new TenantAuditRecord
{
AppKey = job.AppKey,
Action = auditEventMetadata[i].Action,
FileId = auditEventMetadata[i].FileId,
FileName = auditEventMetadata[i].FileName,
Message = auditEventMetadata[i].Message,
UserId = auditEventMetadata[i].UserId,
GroupId = "",
EventTime = DateTime.UtcNow,
};
tenantAuditRecords.Add(auditRecord);
UserAuditRecord userAuditRecord = new UserAuditRecord
{
AppKey = job.AppKey,
FileId = auditEventMetadata[i].FileId,
FileName = auditEventMetadata[i].FileName,
Message = auditEventMetadata[i].Message,
UserId = auditEventMetadata[i].UserId,
Action = auditEventMetadata[i].Action,
EventTime = DateTime.UtcNow,
GroupId = ""
};
userAuditRecords.Add(userAuditRecord);
}
await AuditDB.AppendRecordsList(job.AppKey, tenantAuditRecords, TenantAuditContainer);
await AuditDB.AppendRecordsList(userAuditRecords[0].UserId, userAuditRecords, UserAuditContainer);
}
catch (Exception ex)
{
_logger.LogError($"{ex.Message}");
}
return;
}
internal async Task ProcessKeyVaultInsertionJob(Job cdpJob)
{
KeyVaultService kvs = new KeyVaultService(Constants.KeyVaultURI);
KeyVaultEvent vaultEvent;
try
{
vaultEvent = JsonSerializer.Deserialize<KeyVaultEvent>(cdpJob.JobMetadata);
} catch (Exception ex)
{
_logger.LogError("Error parsing keyvault event metadata error: {0}",ex.ToString());
return;
}
await kvs.SetSecretAsync(vaultEvent.FileId, vaultEvent.AESKey);
return;
}
internal async Task ProcessMailMetaJob(Job cdpJob)
{
string Connection = Constants.StorageConnString;
var blobServiceClient = new BlobServiceClient(Connection);
var blobClient = blobServiceClient.GetBlobContainerClient(cdpJob.AppKey.ToLower());
var blob = blobClient.GetBlobClient(string.Format("{0}", cdpJob.Id));
var exists = await blob.ExistsAsync();
if (!exists)
{
return;
}
BlobDownloadInfo blobDownloadInfo = await blob.DownloadAsync();
_logger.LogDebug($"Exists?: {exists.ToString()}");
_logger.LogDebug($"Download resp: {blobDownloadInfo.Details}");
using (MemoryStream ms = new MemoryStream())
{
await blobDownloadInfo.Content.CopyToAsync(ms);
byte[] byteArray = ms.ToArray();
string mailRecJson = System.Text.Encoding.UTF8.GetString(byteArray);
_logger.LogInformation("got event json from blob: " + mailRecJson);
MailRecord record = JsonSerializer.Deserialize<MailRecord>(mailRecJson);
await MailProcessor.ProcessMailEvent(record, _logger);
}
return;
}
}
public class MailRecord
{
public required string AppKey { get; set; }
public required string MailId { get; set; }
public required string SenderEmail { get; set; }
public required List<string> Attachments { get; set; }
public required List<string> ReceiverEmails { get; set; }
public required List<AttachmentDetails> AttachmentDetails { get; set; }
}
public class AttachmentDetails
{
public required string FileId { get; set; }
public required string HeadRevisionId { get; set; }
public string? FileType { get; set; }
public required string FileName { get; set; }
}
public class Job
{
public required string Id { get; set; }
public required string AppKey { get; set; }
public required JobType EventType { get; set; }
public string? JobMetadata { get; set; }
}
public enum JobType
{
MailMetaProcessing,
KeyVaultInsertion,
AddAudits,
KeyVaultInsertionBatch,
AddAuditsBatch
}
public class KeyVaultEvent
{
public required string FileId { get; set; }
public required string AESKey { get; set; }
}
public class AuditEventMetadata
{
public required string FileId { get; set; }
public string FileName { get; set; }
public string UserId { get; set; }
public string GroupId { get; set; }
public string Action { get; set; }
public string Message { get; set; }
}
}