botocore.exceptions.ReadTimeoutError: Read timeout on endpoint URL: https://lambda.us-east-1.amazonaws.com

Recently while working on one of our EMR projects that uses lambdas and airflow, I ran into the following timeout issue:

We have a lambda that was invoked from boto3 in a Airflow step that would update dynamo db with values needed for our pipeline. This function worked in previous tests with no issues. We did add to the lambda function which was causing it to take longer than normal. When we tested the lambda from the console, the function worked fine, albeit it took a bit longer than the previous version. When calling from Airflow we would continually run into the timeout issue, causing the function to be executed multiple times during retries.

I thought to test this function from the awscli and it revealed the issue, the default boto3 timeout is 60 seconds, this was longer than our lambda was taking. So even though we set the lambda timeout to 4 minutes, boto was timing out at 1 minute, never getting the response back from lambda. The way we fixed this was to have boto3 setup a lambda_config that had a longer timeout.

RequestsDependencyWarning: urllib3 (1.26.18) or chardet (3.0.4) doesn’t match a supported version!

I ran into this issue on a CENTOS8 server that has yet to be updated to RHEL8, after upgrading some packages via Pip:

Turns out:

Module python3-requests is not compatible with locally installed third party module urllib3 of version 1.26.8 and get conflicting with Red Hat provided python3-urllib3 version 1.24.2-5.el8.

I was able to get around this by upgrading URLLIB3 and REQUESTS:

Works Now:

Python Linux Find Files With Pattern Accessed Older Than N Days And Remove

This is a neat utility that you can use to keep in your sysadmin bag of tricks, it walks the directory you define recursively and grabs all the file access times and stores them into a list, it then compares them against a command line parameter for days ago. If its older than N days it will remove the file. What’s really nice about this utility is it has a debug mode, this way you can see what will be deleted before you remove debug and execute it.

AWS EMR ImportError: this version of pandas is incompatible with numpy < 1.17.3

I found another one that I thought was worth writing a quick blog post about. We use AWS Elastic Map Reduce with transient clusters, so in order to get the python libraries installed, we need to use the bootstrap feature. We ran into many issues trying the standard bootstrap script which looked something like this:

The contents of requirements.txt looked like this:

We would get all the nodes in the cluster to bootstrap properly however the logs showed the following:

And when trying to import from pyspark, we saw this:

After speaking with AWS support, it turns out this was a known issue. When a cluster is launched, EMR first provisions the EC2 instances, after that it runs the bootstrap actions. Thus, when the bootstrap action runs, it installs the desired version. However, since the applications are installed after the bootstrap action, these applications override the custom installation for the Python packages. In order to get around the issue of the version being overridden, the workaround is to make use of a Bootstrap Action that delays the installation of the packages until the nodes are fully up and running. This will resolve the conflict that we have been seeing with pandas and numpy. Here is what our final working bootstrap.sh looks like, hope this helps, it was a tough one to solve:

Automate pg_dump pg_restore Of Tables From Config File Send Slack Update

You can use this python code to setup a cron that will sync postgres tables from one database to another. This will read from a config file and will be able to do multiple tables from the same run. This can be useful to sync a daily table from source to destinations. This will also send a alert to slack if its ok or critical.

LOGGING EXAMPLE:

Python Remove Files That Match Pattern Older Than N Days

Neat little script that implements find in pure python, this can be passed different patterns and directories. The script will walk the directories and match the patterns, it will then generate a list of files and get the ctime of each. Some comparison is done against a date you set and removes them. This is great for cleaning up application logs that clog up the filesystem.

Mass Rename Files In Gcloud With Python Multiprocessing Parallel Gsutil

I had been tasked with renaming in place, up in the cloud, not bringing the files down locally, 50000 files. I looked at using wildcards with gsutil however I was not able to remove what I wanted from the file, so I set out on creating a shell script to perform the task, I created a listing of files with gsutil and did some awk magic to get just the filenames into listing2.txt. I wrote the following loop.

This will rename the files stripping out what I wanted, files go from:

work-data-sample__0_0_1.csv.gz to data-sample__0_0_1.csv.gz

I launched it and noticed something odd, it was only iterating over the list and making one call to the gcloud api to rename the file. This was going to take forever, it actually took 24 hours. I did some reading of the docs and saw that gsutil has a -m option for multiprocessing, I also checked the source code and it looks like gsutil is multiprocess out of the box.

gsutil source code:

This is basically saying if the OS can handle multiprocessing, lets spawn the same amount of processes that the system has cpus, and then set the thread count to 5. So my for loop in bash would of taken forever with -m option as well.

So I created some python code that would solve this issue, it would perform all the steps in one, list the files and substring out the filename, and use pythons multiprocessing to spawn 25 workers to do the api calls in chunks. I learned a lot from this and I hope it helps others, I will add comments in the code to show whats going on.

You can see the process spawns 25 worker processes that will iterate over the list and perform the move in chunks.

Python Function Execute Subprocess With Timeout

I have a project that rsync’s data from an RPM repository for a local version of this repo. The issue I was faced with was the remote mirror would sometimes stop the rsync due to overloaded network or other unforeseen issues. I wanted to use rsyncs hashing algorithm to have it start right where it left off so I wrote a function to do this. If 900 seconds was hit it usually meant there was an issue with the transfer. I also want to state here that I observed the rsync stop serving issue on many mirrors so it was not just an issue with the TCP network. I use this in production and it logs each iteration or restart. The function below will also kill the current rsync so multiple copies are not running at the same time. I also only wanted to perform 5 iterations of rsync upon error or timeout so I use a while loop here.

Here are the individual rsync commands in the INI configuration.

Here is how I call the execute_jobs_timeout() function:

The function:

Log Snippet showing each command executing:

Python Generator Find Files With Wildcard

This is a neat way to generate file names in a directory that match a specific pattern, I use this to generate a list of files exported out of hive to load into S3.

Python3 Subprocess and Rsync Deadlock Strace Timeout

I recently came across a tough to debug issue where I was calling a shell script from python using the subprocess module, this shell script called rsync, no matter what I would always run into a timeout situation. I fired up strace and noticed that the process was in a timeout state.

select(4, NULL, [3], [3], {60, 0}) = 0 (Timeout)

I looked at the subprocess documentation and apparently using pipes will fill the system pipe buffer.

Warning

This will deadlock when using stdout=PIPE and/or stderr=PIPE and the child process generates enough output to a pipe such that it blocks waiting for the OS pipe buffer to accept more data. Use communicate() to avoid that.

I was baffled, I finally took the approach to eliminate stderr and stdout and just check the return status of the command using run(). Here is what I finally came up with, and all was well.

Hope you find this and it helps you.