Learning how to deploy enterprise grade pipelines in 5 months.
I had fun.
In April, a very nice chap offered to get me into AWS activate. This sounded fun, so I said yes.
In June & July I deployed the first version of datamule cloud — moving code running off a spare laptop onto EC2 instances. I also deployed a couple new datasets (proxy voting records, institutional holdings, etc) that updated daily using ECS Fargate. It was fun, but I had no idea what I was doing.
Here’s my first systems architecture. Very theoretical, not at all practical. Here’s what was actually implemented.
In late September, I decided to write version 2. This was because a very nice CEO asked how he could give me money. I had no answer. This was because my first cloud was built to serve the needs of extremely cost conscious researchers and startups, not enterprises.
So, I decided to make something enterprise ready.
Design Philosophy
- I need to be able to deploy new products quickly and easily.
- Users need to be able to ingest data quickly and easily.
- The systems architecture needs to be understandable.
This is because between datamule-python and txt2dataset, I have thousands of datasets to deploy. Also, I am a team of one with the goal to make every scrap of information in the SEC accessible. So, things need to work, be cheap, and easy to fix/modify.
I settled on an architecture centered around S3.
Pipelines that run daily do the bulk of the lifting for fresh starts, while instant pipelines run constantly and keep everything up to date in real time. All intermediate steps and progress is stored in S3.
Get John Friedman’s stories in your inbox
Join Medium for free to get updates from this writer.
Daily
- Every day, Eventbridge triggers a steps function pipeline. The pipeline chains smaller steps function pipelines for greater modularity and compartmentalization. Each component pipeline uses ECS Fargate tasks, with a lambda orchestrator to pass information such as batch information for when we want to do concurrency.
Instant
- Simultaneously, I run EC2 instances that update the data in real time.
- These instances update S3 nightly with accessions of processed data, so that the daily pipeline doesn’t duplicate.
A neat feature is that rather than enable snapshots for our databases, we can use the existing S3 layer!
S3 & R2
Notation:
- folders use dashes for spaces
- files use underscores for spaces
## S3
Access via IAM, as AWS.
datamule-cloud/
sec/filings/
sec_master_submissions.csv.zst
sec_accessions.txt.zst
missing/
missing_sgml_archive.json.zst
missing_tar_archive.json.zst
missing_xbrl_archive.json.zst
missing_sec_filings_lookup.json.zst
inventory/
inventory_sgml_archive.txt.zst
daily_sgml_archive.txt.zst
instant_sgml_archive.txt.zst
inventory_tar_archive.txt.zst
daily_tar_archive.txt.zst
instant_tar_archive.txt.zst
inventory_xbrl_archive.txt.zst
daily_xbrl_archive.txt.zst
instant_xbrl_archive.txt.zst
inventory_sec_filings_lookup.txt.zst
inventory-sgml/
{BATCH_NUM}.txt.zst
inventory-tar/
{BATCH_NUM}.txt.zst
inventory-xbrl/
{BATCH_NUM}.txt.zst
xbrl/
{accession}.json.zst
data/
{accession}.json.zst
sec-filings-lookup/
delta_sec_accession_cik_table.csv.zst
delta_sec_submission_details_table.csv.zst
delta_sec_documents_table.csv.zst
complete_sec_accession_cik_table.csv.zst
complete_sec_submission_details_table.csv.zst
complete_sec_documents_table.csv.zst
## R2
Can't use IAM, as Cloudflare.
sgml-archive/
{accession}.sgml # application type is zst
tar-archive/
{accession}.tar # application type is zstCodebase
codebase/ # where we store our code, and docker configurations
daily/
bucket/
src/
update_missing_s3.py
update_sec_filings_data_s3.py
update_sec_filings_inventory_r2.py
update_sec_filings_inventory_s3.py
update_sec_filings_lookup_s3.py
update_sec_filings_sgml_r2.py
update_sec_filings_tar_r2.py
update_sec_filings_xbrl_s3.py
update_sec_master_submissions_index_s3.py
weakly_cleanup.py
docker/ # This is ECS
Dockerfile
entrypoint.sh
requirements.txt
mysql/
src/
update_sec_filings_lookup_mysql.py
docker/ # This is ECS
Dockerfile
entrypoint.sh
requirements.txt
orchestrator/
src/
calculate_s3_batches.py
calculate_inventory_batches.py
docker/ # THIS IS LAMBDA
Dockerfile
requirements.txt
utils/
src/
bucket_async_utils.py
bucket_utils.py
compression_utils.py
mysql_utils.py
instant/
src/
go.mod
monitor_efts_instant.py
monitor_rss_instant.py
production.json
updater_instant.py
utils.py
websocket_instant.go
deployment/
ecs/
daily/
bucket/
terraform/
mysql/
terraform/
main.tf
outputs.tf
variables.tf
databases/
terraform/
ecr/
terraform/
main.tf
outputs.tf
variables.tf
lambda/
terraform/
main.tf
variables.tf
outputs.tf
networking/
terraform/
main.tf
outputs.tf
variables.tf
steps/
component_pipelines/ # Pipelines that group tasks together, to be triggered by master pipelines
daily_database_s3.json
daily_database_mysql.json
daily_inventory.json
daily_s3.json
daily_setup.json
weekly_cleanup.json
master_pipelines/ # master pipelines
daily_pipeline.json
end_of_week_pipeline.json
scheduling/
terraform/
main.tf
outputs.tf
variables.tf
terraform/
main.tf
variables.tf
outputs.tf
config.tfvars # environmental variables
secrets.tfvars # secrets not committed to github
documentation/
buckets.md
codebase.md
databases.md
definitions.md
instant_readme.md
misc.md
pipeline_daily.md
pipeline_instant.md
.gitignore # ignores secrets.tfvars
todo.md # future todoDaily Pipeline
## Description
Pipeline that runs daily to update buckets and databases, as well as catch missing filings.
## Schedule
Pipeline does not run if an instance is still running. In that case it trys again the next day.
Daily Pipeline (Tuesday-Saturday 3 AM ET)
1. Daily Setup
2. Daily S3
3. Daily Database S3
4. Daily DB
End of Week Pipeline (Sunday)
1. Weekly Cleanup
2. Inventory
3. Daily Pipeline
## First Start
1. Run End of Week Pipeline Manually
## Pipelines
### Daily Setup
Sequential.
1. `update_sec_master_submissions_index_s3`
2. `update_missing_s3`
### Daily S3
1. `calculate_s3_batches` - Lambda -> ECS
2. `update_sec_filings_sgml_r2`
3. `update_sec_filings_tar_r2`
4. `update_sec_filings_xbrl_s3`
### Daily Database S3
Runs in parallel, but not concurrent. Updates `delta` and then appends `delta` to `complete`
- `update_sec_filings_lookup_s3`
- `update_sec_accession_cik_table_s3()`
- `update_sec_submission_details_table_s3()`
- `update_sec_documents_table_s3()`
Future:
- `update_sec_filings_proxy_voting_records_s3` # start in 2024
- `update_sec_filings_insider_transactions_s3` # start when is good
- `update_sec_filings_institutional_holdings_s3` # start when is good
- `update_sec_filings_items_s3` # start for select document types. and file extensions
- `update_sec_filings_data_s3`
- `update_sec_filings_company_classifications`
### Daily Database MySQL
Parallel, but non concurrent.
- `update_sec_filings_lookup_mysql`
Future:
- `update_sec_filings_proxy_voting_records_mysql`
- `update_sec_filings_insider_transactions_mysql`
- `update_sec_filings_institutional_holdings_mysql`
### Weekly Cleanup
1. `weakly_cleanup`
### Daily Inventory
- `calculate_inventory_batches` - Lambda -> ECS
- Trigger `update_sec_filings_inventory_s3` batches
- Trigger `combine_txt_batches_s3` no batch .
- Trigger `update_sec_filings_inventory_r2` batches
- Trigger `combine_txt_batches_s3` no batchInstant Pipeline
- `monitor_efts_instant`: sends POSTS to `updater_instant` when efts updates.
- `monitor_rss_instant`: sends POSTS to `updater_instant` when rss updates.
- `updater_instant`: recieves new filings, updates datasets in real time.
- `websocket_instant` disseminates new filings to users in real time.I should note that this architecture has not been deployed yet. I’ve spent about six weeks writing it out without testing. So, tomorrow, when I deploy it, will be fun. Everything is going to break slightly. But, overall, I’m confident that the architecture is sound.
Misc design decisions:
- I don’t like waiting — I originally designed a simpler architecture without concurrency. It would have made fresh start time take 1–2 weeks. Fresh start time (excluding pulling from the SEC due to rate limits) now takes a few hours.
- I get distracted — early on in v2, a bunch of companies and accelerators asked for flyouts / meetings. I don’t know how to business yet, so this required a lot of brainpower. My architecture was heavily influenced by: “I am not going to look at this for a week, it needs to make sense instantly when I come back”.
Other Misc
- I need to revamp datamule’s dashboard to account for the new products. Also, set up subscriptions, and invoices. Companies want invoices.
- I need to setup the AWS S3 distribution system. Currently looking at presigned urls with caching between a unique user id hash. My existing cloud only uses R2, which has zero egress fees.
Sample of Forthcoming Products
Small sample of forthcoming products. There will be a lot more. My goal is to get dozens or so datasets out per week.
SEC Filings SGML R2
SEC Filings Tar R2
SEC Filings XBRL S3
SEC Filings Fundamentals S3
SEC Filings Text S3
SEC Filings Markdown S3
SEC Filings Data S3
SEC Filings Data Tuples S3
SEC Filings Items S3
SEC Proxy Voting Records S3
SEC Institutional Holdings S3
SEC Insider Transactions S3
SEC Filings Lookup MySQL
SEC Proxy Voting Records MySQL
SEC Institutional Holdings MySQL
SEC Insider Transactions MySQL
SEC Items MySQL
SEC Websocket
SEC Filings Entities S3
Managed Datamule Instances
SEC Filings Keyword Search S3
SEC Indicators S3
SEC Business Development Company Investments S3
SEC Business Classifications S3
SEC Filings Tables S3
SEC Filings Classified Data Tuples S3
Datamule Entity API
doc2dict API
xml2tables API
SEC Company Metadata S3
SEC Filings Complexity S3
SEC Filings Sentiment S3If you’ve read this far
I apologize for the sloppy writing. I am tired. I hope that I did not accidentally leave bad words in my documentation.