AWS Apache Managed Airflow EMR ModuleNotFoundError: No module named ‘requests’ Bootstrap

I came across another fun one the other day, we are in the process of migrating our on premise elastic map reduce system into the cloud. We are using AWS EMR and have AWS Managed Airflow as the executor (DAG). We came across an odd situation with a pyspark application. When using Airflow with a SparkSubmitHook, the job would bootstrap looking just fine according to the run logs, however it would fail with No module named 'requests' when the application tried to import it. This was very odd since we have this application running from spark-submit just fine when calling it from the master node command line.

I decided to investigate the differences, our bootstrap script for installing python modules via pip which we call from the EMR API RunJobFlow call looks like this:

This is very basic, all it does is upgrade PIP and run PIP install to install each of the modules. When checking the bootstrap log I can see that PIP upgrades and goes out to the repo and installs the packages just fine. So why were we getting the No module named 'requests' error when executing through airflow. After a ton of googling and research I have found the issue and applied a solution that worked. Turns out airflow will run as the root user when bootstrapping, so if you notice we use the --user argument in pip. This will instruct the packages to be installed in the calling users home directory, the kicker is the code is run by the hadoop user on the EMR cluster nodes after executing from airflow. So turns out, the hadoop user is unable to access the requests module since root installed it with --user. I changed the bootstrap script to the following and it all started working, by removing --user and prefixing with sudo, the packages now get installed in a globally available area for all users. I am sure there are better ways to do this, I am still learning and researching, but if you run into this, the change below with get you out of the woods.

After some further research, and testing we decided to utilize a requirements.txt file to be called by the bootstrap shell script in the RunJobFlow call, first create a requirements.txt file, I like to hardcode the versions so nothing changes unexpectedly as you bootstrap a new cluster and it reaches out to PyPy to get the packages.

https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html

Add your desired packages and version numbers to a file called requirements.txt like below:

Then you will need to copy this file into a bucket you have access to:

Then create a shell script that has the following, call it bootstrap.sh:

Copy that shell script to your bucket:

And execute it via the bootstrap actions in the RunJobFlow EMR API call:

As you can see the shell script will be executed which will copy the requirements.txt file locally and then run pip -r against it which will install all the packages. If you want to see the log on a running cluster, you can ssh to the master node and view the logs here to see the bootstrapping take place:

You should see the stdout log as so:

Hope this helps.

Node Application Stopped Sending Updates To Slack – can’t identify protocol

I wanted to share my experience with a node application that I support. This particular application is an API, it happens to log each and every request it receives to a internal slack channel. Our team uses this channel for many things, to verify when the API is in maintenance, to check that requests are processing, to see status on the overall health of the API etc..

Once in a while out of nowhere we would stop receiving these updates to slack. I set out to troubleshoot why this may be happening, at first we thought that we were hitting the slack rate limits, which is clearly defined here:

https://api.slack.com/docs/rate-limits

However after reading the linked doc, I was skeptical. The API does serve a lot of requests, but not enough to hit their limit. We have 2 servers that send slack messages and process the API requests and when they stopped sending it would be both servers, not just one. Also we have run into this before and restarting the service fixed the issue, so I was sure we did not hit the rate limit. Also trying to send a manual slack update using curl would not work! I knew this had to be something with the linux OS itself, and not the Slack service.

I tried to use netstat to see if we were hitting some type of OS limit, and all looked well. Next I tried one of my favorite tools, LSOF, at first I grepped for deleted to see if something was being held and not released. I did not see anything that stood out, next I grepped for node and low and behold I saw this:

My eyes went right to the “can’t identify protocol”, I opened up a browser and started to research, first hit when searching “can’t identify protocol” was a stack overflow article with the solution.

https://stackoverflow.com/questions/7911840/seeing-too-many-lsof-cant-identify-protocol

When lsof prints “Can’t identify protocol”, this usually relates to sockets (it should also say ‘sock’ in the relevant output lines).

So, somewhere in your code you are probably connecting sockets and not closing them properly (perhaps you need a finally block).

I suggest you step through your code with a debugger (easiest to use your IDE, potentially with a remote debugger, if necesssary), while running lsof side-by-side. You should eventually be able to see which thread / line of code is creating these File Descriptors.

Turns out that the node application was opening file descriptors / sockets and not closing them properly, this caused the system to hit the hard limit on open files / file descriptors. You can view the hard and soft limit like so, switch to the user that application is running as and run:

So you can see that the nodeuser has a hard limit of 4096 open files, which due to the application not properly closing them, we hit the ceiling. This explains why restarting the server or the process fixed it. It would release the open file descriptors and the system was able to open sockets again. I spoke with the developer and we researched, looks like one of the modules we were using was the cause of the issue, perhaps we were using it wrong? I found this out from this article:
https://stackoverflow.com/questions/24922745/node-js-winston-how-to-safely-drain-a-logger

Question:

I have experimented with instantiating and closing winston loggers as (half) described on https://github.com/flatiron/winston#instantiating-your-own-logger, to no avail. I run into trouble closing file transports of Winston’s – walking through it’s source code, I found that the proper way to close off a logger would seem to be the close method. I expected this to take care of closing the transport file used by the logger – however that turned out to be not so.

Varying in frequency according to node.js server load, winston would still hold on to many transport files, infinitely long after the close method had been called for them, indefinitely long after no new writes were being initiated to them. I observed that through the node.js process file descriptors table (lsof -p). Even though close has been called for a Winston logger, it would indefinitely keep the file descriptor of the log file “in use”, i.e. the log file never gets really closed. Thus leaking file descriptors and eventually making the node.js process bump into the ulimit (-n) limit after my application has been up for long.

Should there be a specific programming pattern for draining a Winston logger such that it can be eventually closed?

Answer:

Create only one logger instance and then derive children from it. In this case, winston will hold only one open file handler. Might also be better for performance.

So that was it, the developers agreed and set out to create a patch, problem solved.

Capture AWS CLI Output With Timestamps On Each Line Of Output

I needed a way to get output from aws cli captured into a log file with timestamps, out of the box the aws cli output has no timestamps in the output. If you execute a aws s3 cp command, something like this:

You will see output like so:

As you can see this does not show a timestamp in each event of output from the aws cli. So I scoured the internet and found out some interesting things. Turns out that aws cli out of the box outputs with carriage returns instead of newlines. So trying standard awk piping methods was not working. Also aws cli has the ability to change the output, so I needed to add a cli parameter to set output to text. Next we needed to use TR to substitute the carriage returns with newlines, finally we can pipe to awk and print a timestamp on each output event from the aws cli. The final command and output looks like this:

Produces the following in the log file which is my desired result:

I hope this helps someone else as it was a bear to solve for me.

centos8 postgresql-11-check-db-dir[]: is missing or empty

We have been rolling out CENTOS8 in our lower environments for testing, we use a dedicated vmware virtual server with centos8 minimal install, we only apply hardening techniques to these systems other than the main application, which is pg11 here. These systems use a LVM mounted ext4 filesystem for the data directory.

Recently on 3 of the new PG VMS after reboot we noticed that PG did not start, this also seemed intermittent, even though we have enabled the systemd service to start on reboots. So I checked the pg startup log and did not find too much about the issue. So I checked /var/log/messages and found the issue.

I checked the systemd service file and saw that out of the box postgres had the following:

After=Syslog.target This is a special target unit in systemd and is the standardized name to pull in a syslog implementation.

After=network.target has very little meaning during start-up. It only indicates that the network management stack is up after it has been reached. Whether any network interfaces are already configured when it is reached is undefined.

WantedBy=multi-user.target normally defines a system state where all network services are started up and the system will accept logins, but a local GUI is not started. This is the typical default system state for server systems, which might be rack-mounted headless systems in a remote server room.

Those options above will not ensure that all filesystems in fstab are mounted before postgres starts. So what we were seeing was a classic race condition where postgres started before the data directory was mounted. As I previously mentioned we use a custom PGDATA location. So after some research I found my option that fixed this. You will need to edit the pg11 service and add the following, then reload systemd and reboot and all should work. You can find your LVM mount by running the following:

You can see my u02-data1.mount in the output, so edit and add the override file with the following, if you have multiple mounts, you can add them as well.
Edit with: systemctl edit postgresql-11

Reload the daemon with: systemctl daemon-reload

After=local-fs.target systemd-fstab-generator(3) automatically adds dependencies of type Before= to all mount units that refer to local mount points for this target unit. In addition, it adds dependencies of type Wants= to this target unit for those mounts listed in /etc/fstab that have the auto mount option set.

Automate pg_dump pg_restore Of Tables From Config File Send Slack Update

You can use this python code to setup a cron that will sync postgres tables from one database to another. This will read from a config file and will be able to do multiple tables from the same run. This can be useful to sync a daily table from source to destinations. This will also send a alert to slack if its ok or critical.

LOGGING EXAMPLE:

Python Remove Files That Match Pattern Older Than N Days

Neat little script that implements find in pure python, this can be passed different patterns and directories. The script will walk the directories and match the patterns, it will then generate a list of files and get the ctime of each. Some comparison is done against a date you set and removes them. This is great for cleaning up application logs that clog up the filesystem.

AWS CLI Max Concurrent Requests Tuning

In this post I would like to go over how I tuned a test server for copying / syncing files from the local filesystem to S3 over the internet. If you ever had the task of doing this, you will notice that as the file count grows, so does the time it takes to upload the files to S3. After some web searching I found out that AWS allows you to tune the config to allow more concurrency than default.
AWS CLI S3 Config

The parameter that we will be playing with is max_concurrent_requests
This has a default value of 10, which allows only 10 requests to the AWS API for S3. Lets see if we can make some changes to that value and get some performance gains. My test setup is as follows:

I have 56 102MB files in the test directory:

For the first test I am going to run aws s3 sync with no changes, so out of the box it should have 10 max_concurrent_requests. Lets use the Linux time command to gather the time result to copy all 56 files to S3. I will delete the folder on S3 with each iteration to keep the test the same. You can also view the 443 requests via netstat and count them as well to show whats going on. In all the tests my best result was 250. So as you can see you will need to play with the settings to get the best result, these settings will change along with the server specs.

1. 1m25.919s with the default configuration:

2. Now lets set the max conqurent requests to 20 and try again, you can do this with the command below, after running we can see a little gain.

3. Bumped up to 50 shows a bit more gain:

4. Bumped up to 100, I start to notice that we lost some speed:

5. Bumped up to 250 we see the best result so far:

6. Bumped up to 500, we lose performance, most likely due to the machine resources.

So to wrap up, you can tune the amount of concurrent requests allowed from the aws cli to s3, you will need to play with this setting to get the best results for your machine.

Mass Rename Files In Gcloud With Python Multiprocessing Parallel Gsutil

I had been tasked with renaming in place, up in the cloud, not bringing the files down locally, 50000 files. I looked at using wildcards with gsutil however I was not able to remove what I wanted from the file, so I set out on creating a shell script to perform the task, I created a listing of files with gsutil and did some awk magic to get just the filenames into listing2.txt. I wrote the following loop.

This will rename the files stripping out what I wanted, files go from:

work-data-sample__0_0_1.csv.gz to data-sample__0_0_1.csv.gz

I launched it and noticed something odd, it was only iterating over the list and making one call to the gcloud api to rename the file. This was going to take forever, it actually took 24 hours. I did some reading of the docs and saw that gsutil has a -m option for multiprocessing, I also checked the source code and it looks like gsutil is multiprocess out of the box.

gsutil source code:

This is basically saying if the OS can handle multiprocessing, lets spawn the same amount of processes that the system has cpus, and then set the thread count to 5. So my for loop in bash would of taken forever with -m option as well.

So I created some python code that would solve this issue, it would perform all the steps in one, list the files and substring out the filename, and use pythons multiprocessing to spawn 25 workers to do the api calls in chunks. I learned a lot from this and I hope it helps others, I will add comments in the code to show whats going on.

You can see the process spawns 25 worker processes that will iterate over the list and perform the move in chunks.

Postgres Long Running Active Queries Send To Slack

I needed a utility to alert our team when any long running queries were running on a production postgres cluster. I came up with the following python code that achieves just that. This would alert slack if an active query exceeds 45 mins. The script takes in user parameters as well, I will demonstrate the way to call it. Hope it helps someone.

CRON CALL:

CODE:

SLACK MESSAGE:

Python Function Execute Subprocess With Timeout

I have a project that rsync’s data from an RPM repository for a local version of this repo. The issue I was faced with was the remote mirror would sometimes stop the rsync due to overloaded network or other unforeseen issues. I wanted to use rsyncs hashing algorithm to have it start right where it left off so I wrote a function to do this. If 900 seconds was hit it usually meant there was an issue with the transfer. I also want to state here that I observed the rsync stop serving issue on many mirrors so it was not just an issue with the TCP network. I use this in production and it logs each iteration or restart. The function below will also kill the current rsync so multiple copies are not running at the same time. I also only wanted to perform 5 iterations of rsync upon error or timeout so I use a while loop here.

Here are the individual rsync commands in the INI configuration.

Here is how I call the execute_jobs_timeout() function:

The function:

Log Snippet showing each command executing: